# Copyright 2022 - 2026 The PyMC Labs Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Functions that generate data sets used in examples
"""
import numpy as np
import pandas as pd
from scipy.stats import dirichlet, gamma, norm, uniform
from statsmodels.nonparametric.smoothers_lowess import lowess
default_lowess_kwargs: dict[str, float | int] = {"frac": 0.2, "it": 0}
RANDOM_SEED: int = 8927
rng: np.random.Generator = np.random.default_rng(RANDOM_SEED)
def _smoothed_gaussian_random_walk(
gaussian_random_walk_mu: float,
gaussian_random_walk_sigma: float,
N: int,
lowess_kwargs: dict,
) -> tuple[np.ndarray, np.ndarray]:
"""
Generates Gaussian random walk data and applies LOWESS.
:param gaussian_random_walk_mu:
Mean of the random walk
:param gaussian_random_walk_sigma:
Standard deviation of the random walk
:param N:
Length of the random walk
:param lowess_kwargs:
Keyword argument dictionary passed to statsmodels lowess
"""
x = np.arange(N)
y = norm(gaussian_random_walk_mu, gaussian_random_walk_sigma).rvs(N).cumsum()
filtered = lowess(y, x, **lowess_kwargs)
y = filtered[:, 1]
return (x, y)
[docs]
def generate_synthetic_control_data(
N: int = 100,
treatment_time: int = 70,
grw_mu: float = 0.25,
grw_sigma: float = 1,
lowess_kwargs: dict = default_lowess_kwargs,
) -> tuple[pd.DataFrame, np.ndarray]:
"""
Generates data for synthetic control example.
:param N:
Number of data points
:param treatment_time:
Index where treatment begins in the generated dataframe
:param grw_mu:
Mean of Gaussian Random Walk
:param grw_sigma:
Standard deviation of Gaussian Random Walk
:lowess_kwargs:
Keyword argument dictionary passed to statsmodels lowess
Example
--------
>>> from causalpy.data.simulate_data import generate_synthetic_control_data
>>> df, weightings_true = generate_synthetic_control_data(treatment_time=70)
"""
# 1. Generate non-treated variables
df = pd.DataFrame(
{
"a": _smoothed_gaussian_random_walk(grw_mu, grw_sigma, N, lowess_kwargs)[1],
"b": _smoothed_gaussian_random_walk(grw_mu, grw_sigma, N, lowess_kwargs)[1],
"c": _smoothed_gaussian_random_walk(grw_mu, grw_sigma, N, lowess_kwargs)[1],
"d": _smoothed_gaussian_random_walk(grw_mu, grw_sigma, N, lowess_kwargs)[1],
"e": _smoothed_gaussian_random_walk(grw_mu, grw_sigma, N, lowess_kwargs)[1],
"f": _smoothed_gaussian_random_walk(grw_mu, grw_sigma, N, lowess_kwargs)[1],
"g": _smoothed_gaussian_random_walk(grw_mu, grw_sigma, N, lowess_kwargs)[1],
}
)
# 2. Generate counterfactual, based on weighted sum of non-treated variables. This
# is the counterfactual with NO treatment.
weightings_true = dirichlet(np.ones(7)).rvs(1)
df["counterfactual"] = np.dot(df.to_numpy(), weightings_true.T)
# 3. Generate the causal effect
causal_effect = gamma(10).pdf(np.arange(0, N, 1) - treatment_time)
df["causal effect"] = causal_effect * -50
# 4. Generate the actually observed data, ie the treated with the causal effect
# applied
df["actual"] = df["counterfactual"] + df["causal effect"]
# 5. apply observation noise to all relevant variables
for var in ["actual", "a", "b", "c", "d", "e", "f", "g"]:
df[var] += norm(0, 0.25).rvs(N)
return df, weightings_true
[docs]
def generate_time_series_data(
N: int = 100,
treatment_time: int = 70,
beta_temp: float = -1,
beta_linear: float = 0.5,
beta_intercept: float = 3,
) -> pd.DataFrame:
"""
Generates interrupted time series example data
:param N:
Length of the time series
:param treatment_time:
Index of when treatment begins
:param beta_temp:
The temperature coefficient
:param beta_linear:
The linear coefficient
:param beta_intercept:
The intercept
"""
x = np.arange(0, N, 1)
df = pd.DataFrame(
{
"temperature": np.sin(x * 0.5) + 1,
"linear": np.linspace(0, 1, N),
"causal effect": 10 * gamma(10).pdf(np.arange(0, N, 1) - treatment_time),
}
)
df["deaths_counterfactual"] = (
beta_intercept + beta_temp * df["temperature"] + beta_linear * df["linear"]
)
# generate the actually observed data
# ie the treated with the causal effect applied
df["deaths_actual"] = df["deaths_counterfactual"] + df["causal effect"]
# apply observation noise to all relevant variables
# NOTE: no observation noise on the linear trend component
for var in ["deaths_actual", "temperature"]:
df[var] += norm(0, 0.1).rvs(N)
# add intercept column of ones (for modeling purposes)
# This is correctly a column of ones, not beta_intercept, as beta_intercept
# is already incorporated in the data generation above
df["intercept"] = np.ones(df.shape[0])
return df
[docs]
def generate_time_series_data_seasonal(
treatment_time: pd.Timestamp,
) -> pd.DataFrame:
"""
Generates 10 years of monthly data with seasonality
"""
dates = pd.date_range(
start=pd.to_datetime("2010-01-01"), end=pd.to_datetime("2020-01-01"), freq="M"
)
df = pd.DataFrame(data={"date": dates})
df = df.assign(
year=lambda x: x["date"].dt.year,
month=lambda x: x["date"].dt.month,
t=df.index,
).set_index("date", drop=True)
month_effect = np.array([11, 13, 12, 15, 19, 23, 21, 28, 20, 17, 15, 12])
df["y"] = 0.2 * df["t"] + 2 * month_effect[np.asarray(df.month.values) - 1]
N = df.shape[0]
idx = np.arange(N)[df.index > treatment_time]
df["causal effect"] = 100 * gamma(10).pdf(
np.array(np.arange(0, N, 1)) - int(np.min(idx))
)
df["y"] += df["causal effect"]
df["y"] += norm(0, 2).rvs(N)
# add intercept
df["intercept"] = np.ones(df.shape[0])
return df
[docs]
def generate_time_series_data_simple(
treatment_time: pd.Timestamp, slope: float = 0.0
) -> pd.DataFrame:
"""Generate simple interrupted time series data, with no seasonality or temporal
structure.
"""
dates = pd.date_range(
start=pd.to_datetime("2010-01-01"), end=pd.to_datetime("2020-01-01"), freq="M"
)
df = pd.DataFrame(data={"date": dates})
df = df.assign(
linear_trend=df.index,
).set_index("date", drop=True)
df["timeseries"] = slope * df["linear_trend"]
N = df.shape[0]
df["causal effect"] = (df.index > treatment_time) * 2
df["timeseries"] += df["causal effect"]
# add intercept
df["intercept"] = np.ones(df.shape[0])
# add observation noise
df["timeseries"] += norm(0, 0.25).rvs(N)
return df
[docs]
def generate_did() -> pd.DataFrame:
"""
Generate Difference in Differences data
Example
--------
>>> from causalpy.data.simulate_data import generate_did
>>> df = generate_did()
"""
# true parameters
control_intercept = 1
treat_intercept_delta = 0.25
trend = 1
Δ = 0.5
intervention_time = 0.5
# local functions
def outcome(
t: np.ndarray,
control_intercept: float,
treat_intercept_delta: float,
trend: float,
Δ: float,
group: np.ndarray,
post_treatment: np.ndarray,
) -> np.ndarray:
"""Compute the outcome of each unit"""
return (
control_intercept
+ (treat_intercept_delta * group)
+ (t * trend)
+ (Δ * post_treatment * group)
)
df = pd.DataFrame(
{
"group": [0, 0, 1, 1] * 10,
"t": [0.0, 1.0, 0.0, 1.0] * 10,
"unit": np.concatenate([[i] * 2 for i in range(20)]),
}
)
df["post_treatment"] = df["t"] > intervention_time
df["y"] = outcome(
np.asarray(df["t"]),
control_intercept,
treat_intercept_delta,
trend,
Δ,
np.asarray(df["group"]),
np.asarray(df["post_treatment"]),
)
df["y"] += rng.normal(0, 0.1, df.shape[0])
return df
[docs]
def generate_regression_discontinuity_data(
N: int = 100, true_causal_impact: float = 0.5, true_treatment_threshold: float = 0.0
) -> pd.DataFrame:
"""
Generate regression discontinuity example data
Example
--------
>>> import pathlib
>>> from causalpy.data.simulate_data import generate_regression_discontinuity_data
>>> df = generate_regression_discontinuity_data(true_treatment_threshold=0.5)
>>> df.to_csv(
... pathlib.Path.cwd() / "regression_discontinuity.csv", index=False
... ) # doctest: +SKIP
"""
def is_treated(x: np.ndarray) -> np.ndarray:
"""Check if x was treated"""
return np.greater_equal(x, true_treatment_threshold)
def impact(x: np.ndarray) -> np.ndarray:
"""Assign true_causal_impact to all treated entries"""
y = np.zeros(len(x))
y[is_treated(x)] = true_causal_impact
return y
x = np.sort((uniform.rvs(size=N) - 0.5) * 2)
y = np.sin(x * 3) + impact(x) + norm.rvs(scale=0.1, size=N)
return pd.DataFrame({"x": x, "y": y, "treated": is_treated(x)})
[docs]
def generate_ancova_data(
N: int = 200,
pre_treatment_means: np.ndarray | None = None,
treatment_effect: int = 2,
sigma: int = 1,
) -> pd.DataFrame:
"""
Generate ANCOVA example data
Example
--------
>>> import pathlib
>>> from causalpy.data.simulate_data import generate_ancova_data
>>> df = generate_ancova_data(
... N=200, pre_treatment_means=np.array([10, 12]), treatment_effect=2, sigma=1
... )
>>> df.to_csv(pathlib.Path.cwd() / "ancova_data.csv", index=False) # doctest: +SKIP
"""
if pre_treatment_means is None:
pre_treatment_means = np.array([10, 12])
group = np.random.choice(2, size=N)
pre = np.random.normal(loc=pre_treatment_means[group])
post = pre + treatment_effect * group + np.random.normal(size=N) * sigma
df = pd.DataFrame({"group": group, "pre": pre, "post": post})
return df
[docs]
def generate_geolift_data() -> pd.DataFrame:
"""Generate synthetic data for a geolift example. This will consists of 6 untreated
countries. The treated unit `Denmark` is a weighted combination of the untreated
units. We additionally specify a treatment effect which takes effect after the
`treatment_time`. The timeseries data is observed at weekly resolution and has
annual seasonality, with this seasonality being a drawn from a Gaussian Process with
a periodic kernel."""
n_years = 4
treatment_time = pd.to_datetime("2022-01-01")
causal_impact = 0.2
time = pd.date_range(start="2019-01-01", periods=52 * n_years, freq="W")
untreated = [
"Austria",
"Belgium",
"Bulgaria",
"Croatia",
"Cyprus",
"Czech_Republic",
]
df = (
pd.DataFrame(
{
country: create_series(n_years=n_years, intercept=3)
for country in untreated
}
)
.assign(time=time)
.set_index("time")
)
# create treated unit as a weighted sum of the untreated units
weights = np.random.dirichlet(np.ones(len(untreated)), size=1)[0]
df = df.assign(Denmark=np.dot(df[untreated].values, weights))
# add observation noise
for col in untreated + ["Denmark"]:
df[col] += np.random.normal(size=len(df), scale=0.1)
# add treatment effect
df["Denmark"] += np.where(df.index < treatment_time, 0, causal_impact)
# ensure we never see any negative sales
df = df.clip(lower=0)
return df
[docs]
def generate_multicell_geolift_data() -> pd.DataFrame:
"""Generate synthetic data for a geolift example. This will consists of 6 untreated
countries. The treated unit `Denmark` is a weighted combination of the untreated
units. We additionally specify a treatment effect which takes effect after the
`treatment_time`. The timeseries data is observed at weekly resolution and has
annual seasonality, with this seasonality being a drawn from a Gaussian Process with
a periodic kernel."""
n_years = 4
treatment_time = pd.to_datetime("2022-01-01")
causal_impact = 0.2
time = pd.date_range(start="2019-01-01", periods=52 * n_years, freq="W")
untreated = [
"u1",
"u2",
"u3",
"u4",
"u5",
"u6",
"u7",
"u8",
"u9",
"u10",
"u11",
"u12",
]
df = (
pd.DataFrame(
{
country: create_series(n_years=n_years, intercept=3)
for country in untreated
}
)
.assign(time=time)
.set_index("time")
)
treated = ["t1", "t2", "t3", "t4"]
for treated_geo in treated:
# create treated unit as a weighted sum of the untreated units
weights = np.random.dirichlet(np.ones(len(untreated)), size=1)[0]
df[treated_geo] = np.dot(df[untreated].values, weights)
# add treatment effect
df[treated_geo] += np.where(df.index < treatment_time, 0, causal_impact)
# add observation noise to all geos
for col in untreated + treated:
df[col] += np.random.normal(size=len(df), scale=0.1)
# ensure we never see any negative sales
df = df.clip(lower=0)
return df
# -----------------
# UTILITY FUNCTIONS
# -----------------
[docs]
def generate_seasonality(
n: int = 12, amplitude: int = 1, length_scale: float = 0.5
) -> np.ndarray:
"""Generate monthly seasonality by sampling from a Gaussian process with a
Gaussian kernel, using numpy code"""
# Generate the covariance matrix
x = np.linspace(0, 1, n)
x1, x2 = np.meshgrid(x, x)
cov = periodic_kernel(
x1, x2, period=1, length_scale=length_scale, amplitude=amplitude
)
# Generate the seasonality
seasonality = np.random.multivariate_normal(np.zeros(n), cov)
return seasonality
[docs]
def periodic_kernel(
x1: np.ndarray,
x2: np.ndarray,
period: int = 1,
length_scale: float = 1.0,
amplitude: int = 1,
) -> np.ndarray:
"""Generate a periodic kernel for gaussian process"""
return amplitude**2 * np.exp(
-2 * np.sin(np.pi * np.abs(x1 - x2) / period) ** 2 / length_scale**2
)
[docs]
def create_series(
n: int = 52,
amplitude: int = 1,
length_scale: int = 2,
n_years: int = 4,
intercept: int = 3,
) -> np.ndarray:
"""
Returns numpy tile with generated seasonality data repeated over
multiple years
"""
return np.tile(
generate_seasonality(n=n, amplitude=amplitude, length_scale=2) + intercept,
n_years,
)
[docs]
def generate_piecewise_its_data(
N: int = 100,
interruption_times: list[int] | None = None,
baseline_intercept: float = 10.0,
baseline_slope: float = 0.1,
level_changes: list[float] | None = None,
slope_changes: list[float] | None = None,
noise_sigma: float = 1.0,
seed: int | None = None,
) -> tuple[pd.DataFrame, dict]:
"""
Generate piecewise Interrupted Time Series data with known ground truth parameters.
This function creates synthetic data for testing and demonstrating piecewise ITS
/ segmented regression models. The data follows the model:
y_t = β₀ + β₁t + Σₖ(level_k · I_k(t) + slope_k · R_k(t)) + ε_t
Where:
- I_k(t) = 1 if t >= T_k else 0 (step function for level change)
- R_k(t) = max(0, t - T_k) (ramp function for slope change)
Parameters
----------
N : int, default=100
Number of time points in the series.
interruption_times : list[int], optional
List of time indices where interruptions occur. Defaults to [50].
baseline_intercept : float, default=10.0
The intercept (β₀) of the baseline trend.
baseline_slope : float, default=0.1
The slope (β₁) of the baseline trend.
level_changes : list[float], optional
List of level changes at each interruption. Length must match
interruption_times. If None, defaults to [5.0] for single interruption.
slope_changes : list[float], optional
List of slope changes at each interruption. Length must match
interruption_times. If None, defaults to [0.0] (no slope change).
noise_sigma : float, default=1.0
Standard deviation of the Gaussian noise.
seed : int, optional
Random seed for reproducibility.
Returns
-------
df : pd.DataFrame
DataFrame with columns:
- 't': time index (0 to N-1)
- 'y': observed outcome with noise
- 'y_true': outcome without noise (ground truth)
- 'counterfactual': baseline trend without intervention effects
- 'effect': true causal effect at each time point
params : dict
Dictionary containing the true parameters:
- 'baseline_intercept': β₀
- 'baseline_slope': β₁
- 'level_changes': list of level changes
- 'slope_changes': list of slope changes
- 'interruption_times': list of interruption times
- 'noise_sigma': noise standard deviation
Examples
--------
>>> from causalpy.data.simulate_data import generate_piecewise_its_data
>>> # Single interruption with level and slope change
>>> df, params = generate_piecewise_its_data(
... N=100,
... interruption_times=[50],
... level_changes=[5.0],
... slope_changes=[0.2],
... seed=42,
... )
>>> df.shape
(100, 5)
>>> # Multiple interruptions
>>> df, params = generate_piecewise_its_data(
... N=150,
... interruption_times=[50, 100],
... level_changes=[3.0, -2.0],
... slope_changes=[0.1, -0.15],
... seed=42,
... )
>>> len(params["interruption_times"])
2
>>> # Level change only (no slope change)
>>> df, params = generate_piecewise_its_data(
... N=100,
... interruption_times=[50],
... level_changes=[5.0],
... slope_changes=[0.0],
... seed=42,
... )
"""
# Set defaults
if interruption_times is None:
interruption_times = [50]
n_interruptions = len(interruption_times)
if level_changes is None:
level_changes = [5.0] * n_interruptions
if slope_changes is None:
slope_changes = [0.0] * n_interruptions
# Validate inputs
if len(level_changes) != n_interruptions:
raise ValueError(
f"level_changes length ({len(level_changes)}) must match "
f"interruption_times length ({n_interruptions})"
)
if len(slope_changes) != n_interruptions:
raise ValueError(
f"slope_changes length ({len(slope_changes)}) must match "
f"interruption_times length ({n_interruptions})"
)
for t_k in interruption_times:
if t_k < 0 or t_k >= N:
raise ValueError(
f"Interruption time {t_k} is outside valid range [0, {N - 1}]"
)
# Set random seed
if seed is not None:
np.random.seed(seed)
# Generate time index
t = np.arange(N)
# Compute baseline (counterfactual)
counterfactual = baseline_intercept + baseline_slope * t
# Compute intervention effects
effect = np.zeros(N)
for k, t_k in enumerate(interruption_times):
# Step function: I_k(t) = 1 if t >= t_k
step = (t >= t_k).astype(float)
# Ramp function: R_k(t) = max(0, t - t_k)
ramp = np.maximum(0, t - t_k).astype(float)
effect += level_changes[k] * step + slope_changes[k] * ramp
# Compute true outcome (without noise)
y_true = counterfactual + effect
# Add noise
noise = np.random.normal(0, noise_sigma, N)
y = y_true + noise
# Create DataFrame
df = pd.DataFrame(
{
"t": t,
"y": y,
"y_true": y_true,
"counterfactual": counterfactual,
"effect": effect,
}
)
# Store parameters
params = {
"baseline_intercept": baseline_intercept,
"baseline_slope": baseline_slope,
"level_changes": level_changes,
"slope_changes": slope_changes,
"interruption_times": interruption_times,
"noise_sigma": noise_sigma,
}
return df, params
[docs]
def generate_staggered_did_data(
n_units: int = 50,
n_time_periods: int = 20,
treatment_cohorts: dict[int, int] | None = None,
treatment_effects: dict[int, float] | None = None,
unit_fe_scale: float = 2.0,
time_fe_scale: float = 1.0,
sigma: float = 0.5,
seed: int | None = None,
) -> pd.DataFrame:
"""
Generate synthetic panel data with staggered treatment adoption.
Creates a balanced panel dataset where different cohorts of units receive
treatment at different times. Supports dynamic treatment effects that vary
by event-time (time relative to treatment).
Parameters
----------
n_units : int, default=50
Total number of units in the panel.
n_time_periods : int, default=20
Number of time periods in the panel.
treatment_cohorts : dict[int, int], optional
Dictionary mapping treatment time to number of units in that cohort.
Units not assigned to any cohort are never-treated.
Default: {5: 10, 10: 10, 15: 10} (3 cohorts of 10 units each,
leaving 20 never-treated units).
treatment_effects : dict[int, float], optional
Dictionary mapping event-time (t - G) to treatment effect.
Event-time 0 is the first treated period.
Default: {0: 1.0, 1: 1.5, 2: 2.0, 3: 2.5} with constant effect
of 2.5 for all subsequent periods.
unit_fe_scale : float, default=2.0
Scale of unit fixed effects (drawn from Normal(0, unit_fe_scale)).
time_fe_scale : float, default=1.0
Scale of time fixed effects (drawn from Normal(0, time_fe_scale)).
sigma : float, default=0.5
Standard deviation of idiosyncratic noise.
seed : int, optional
Random seed for reproducibility.
Returns
-------
pd.DataFrame
Panel data with columns:
- unit: Unit identifier
- time: Time period
- treated: Binary indicator (1 if treated at time t, 0 otherwise)
- treatment_time: Time of treatment adoption (np.inf for never-treated)
- y: Observed outcome
- y0: Counterfactual outcome (for validation)
- tau: True treatment effect (for validation)
Examples
--------
>>> from causalpy.data.simulate_data import generate_staggered_did_data
>>> df = generate_staggered_did_data(n_units=30, n_time_periods=15, seed=42)
>>> df.head()
unit time treated treatment_time ...
Notes
-----
The data generating process is:
.. math::
Y_{it} = \\alpha_i + \\lambda_t + \\tau_{it} \\cdot D_{it} + \\varepsilon_{it}
where :math:`\\alpha_i` is the unit fixed effect, :math:`\\lambda_t` is the
time fixed effect, :math:`D_{it}` is the treatment indicator, and
:math:`\\tau_{it}` is the dynamic treatment effect that depends on
event-time :math:`e = t - G_i`.
"""
if seed is not None:
local_rng = np.random.default_rng(seed)
else:
local_rng = np.random.default_rng()
# Default treatment cohorts: 3 cohorts at times 5, 10, 15
if treatment_cohorts is None:
treatment_cohorts = {5: 10, 10: 10, 15: 10}
# Default dynamic treatment effects: ramp up then stabilize
if treatment_effects is None:
treatment_effects = {0: 1.0, 1: 1.5, 2: 2.0, 3: 2.5}
# Validate cohort assignments don't exceed n_units
total_treated = sum(treatment_cohorts.values())
if total_treated > n_units:
raise ValueError(
f"Total units in treatment cohorts ({total_treated}) "
f"exceeds n_units ({n_units})"
)
# Generate unit fixed effects
unit_fe = local_rng.normal(0, unit_fe_scale, n_units)
# Generate time fixed effects
time_fe = local_rng.normal(0, time_fe_scale, n_time_periods)
# Assign treatment times to units
treatment_times = np.full(n_units, np.inf) # Default: never treated
unit_idx = 0
for g, n_cohort in treatment_cohorts.items():
treatment_times[unit_idx : unit_idx + n_cohort] = g
unit_idx += n_cohort
# Shuffle treatment assignments
local_rng.shuffle(treatment_times)
# Build panel data
rows = []
for i in range(n_units):
for t in range(n_time_periods):
g_i = treatment_times[i]
is_treated = t >= g_i
# Counterfactual outcome (no treatment)
y0 = unit_fe[i] + time_fe[t]
# Treatment effect based on event-time
if is_treated:
event_time = int(t - g_i)
# Use specified effect or last available effect for later periods
if event_time in treatment_effects:
tau = treatment_effects[event_time]
else:
# Use the effect for the maximum specified event-time
max_event_time = max(treatment_effects.keys())
tau = treatment_effects[max_event_time]
else:
tau = 0.0
# Add noise
epsilon = local_rng.normal(0, sigma)
# Observed outcome
y = y0 + tau + epsilon
rows.append(
{
"unit": i,
"time": t,
"treated": int(is_treated),
"treatment_time": g_i,
"y": y,
"y0": y0,
"tau": tau,
}
)
df = pd.DataFrame(rows)
return df