Core Modules

The msnoise.core package contains all internal implementation modules. Plugin authors should import from msnoise.plugins for a stable API.

Database & Connection

MSNoise database connection and logging utilities.

msnoise.core.db.connect(inifile=None)

Establishes a connection to the database and returns a Session object.

Parameters:

inifile (string) – The path to the db.ini file to use. Defaults to os.cwd() + db.ini

Return type:

sqlalchemy.orm.session.Session

Returns:

Session object, needed for many of the other API methods.

msnoise.core.db.create_database_inifile(tech, hostname, database, username, password, prefix='')

Creates the db.ini file based on supplied parameters.

Parameters:
  • tech (int) – The database technology used: 1=sqlite 2=mysql

  • hostname (string) – The hostname of the server (if tech=2) or the name of the sqlite file if tech=1)

  • database (string) – The database name

  • username (string) – The user name

  • prefix (string) – The prefix to use for all tables

  • password (string) – The password of user

Returns:

None

msnoise.core.db.get_engine(inifile=None)

Returns the a SQLAlchemy Engine

Parameters:

inifile (str) – The path to the db.ini file to use. Defaults to os.cwd() + db.ini

Return type:

sqlalchemy.engine.Engine

Returns:

An Engine Object

msnoise.core.db.get_logger(name, loglevel=None, with_pid=False)

Returns the current configured logger or configure a new one.

msnoise.core.db.read_db_inifile(inifile=None)

Reads the parameters from the db.ini file.

Parameters:

inifile (string) – The path to the db.ini file to use. Defaults to os.cwd() + db.ini

Return type:

collections.namedtuple

Returns:

tech, hostname, database, username, password

Configuration & Params

MSNoise configuration management, parameter merging, and plot filename helpers.

msnoise.core.config.build_plot_outfile(outfile, plot_name, lineage_names, *, pair=None, components=None, mov_stack=None, extra=None)

Resolve the output filename for a plot.

When outfile starts with "?", replaces "?" with a standardised auto-generated tag and prepends plot_name, producing a filename of the form:

<plot_name>__<lineage>[__<pair>][__<comp>][__m<ms>][__<extra>].<ext>

If outfile does not start with "?", it is returned unchanged (the caller supplied an explicit path).

If outfile is None or empty, returns None.

Parameters:
  • outfile – Raw outfile argument passed to the plot function (e.g. "?.png").

  • plot_name – Short plot identifier, e.g. "ccftime" or "dvv_mwcs". Must not contain __.

  • lineage_names – List of step name strings (e.g. ["preprocess_1", "cc_1", "filter_1", "stack_1"]) from MSNoiseResult.lineage_names.

  • pair – Optional station pair string ("NET.STA.LOC-NET.STA.LOC"). Dots are kept; colons are replaced with -.

  • components – Optional component string, e.g. "ZZ".

  • mov_stack – Optional moving-stack tuple or string, e.g. ("1D", "1D")"m1D-1D".

  • extra – Optional extra qualifier string appended last.

Returns:

Resolved filename string, or outfile unchanged.

msnoise.core.config.create_config_set(session, set_name, set_number_hint=None)

Create a configuration set for a workflow step.

Parameters:
Return type:

int or None

Returns:

The set_number if set was created successfully, None otherwise

msnoise.core.config.create_project_from_yaml(session, yaml_path)

Seed a freshly-initialised database from a project YAML file.

The project YAML uses category_N keys so that multiple config sets of the same category are unambiguous. Each entry may declare an after field (string or list of strings) naming the category_N steps that must precede it; one WorkflowLink row is created per after entry. Steps whose after target is absent from the YAML produce a warning instead of an error, so partial YAMLs and incremental DB builds are both supported.

Example:

msnoise_project_version: 1

global_1:
  startdate: "2013-04-01"
  enddate:   "2014-10-31"

preprocess_1:
  after: global_1
  cc_sampling_rate: 20.0

cc_1:
  after: preprocess_1
  cc_type_single_station_AC: PCC
  whitening: "N"

filter_1:
  after: cc_1          # single parent
  freqmin: 1.0
  freqmax: 2.0
  AC: "Y"

filter_2:
  after: cc_1          # fan-out from the same cc step
  freqmin: 0.5
  freqmax: 1.0
  AC: "Y"

stack_1:
  after: [filter_1, filter_2]
  mov_stack: "(('2D','1D'))"

refstack_1:
  after: [filter_1, filter_2]   # sibling of stack, not child
  ref_begin: "2013-04-01"
  ref_end:   "2014-10-31"

mwcs_1:
  after: [stack_1, refstack_1]   # list: requires both parents
  freqmin: 1.0
  freqmax: 2.0
Parameters:
  • session – SQLAlchemy session (from connect()).

  • yaml_path – Path to a project YAML file.

Returns:

(created_steps, warnings) — list of category_N strings created and list of warning strings for missing after targets.

Raises:

ValueError – if the YAML is missing msnoise_project_version or a key is not in category_N format.

msnoise.core.config.export_project_to_yaml(session, yaml_path, only_non_defaults=True)

Snapshot the current project DB state as a project YAML file.

The inverse of create_project_from_yaml(). Reads all active WorkflowStep rows, their config sets, the WorkflowLink topology, all DataSource rows, and all Station rows, and writes a msnoise_project_version: 1 YAML that can be re-applied with msnoise db init --from-yaml.

Parameters:
  • session – SQLAlchemy session.

  • yaml_path – Destination path for the YAML file.

  • only_non_defaults – When True (default) only keys whose value differs from the CSV default are written, keeping the file minimal. Set to False to write every key explicitly.

Returns:

Path string of the written file.

msnoise.core.config.delete_config_set(session, set_name, set_number)

Delete a configuration set for a workflow step.

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • set_name (str) – The name of the workflow step (e.g., ‘mwcs’, ‘mwcs_dtt’, etc.)

  • set_number (int) – The set number to delete

Return type:

bool

Returns:

True if set was deleted successfully, False otherwise

msnoise.core.config.get_config(session, name=None, isbool=False, plugin=None, category='global', set_number=None)

Get the value of one or all config bits from the database.

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • name (str) – The name of the config bit to get. If omitted, a dictionary with all config items will be returned

  • isbool (bool) – if True, returns True/False for config name. Defaults to False

  • plugin (str) – if provided, gives the name of the Plugin config to use. E.g. if “Amazing” is provided, MSNoise will try to load the “AmazingConfig” entry point. See Extending MSNoise with Plugins for details.

Return type:

str, bool or dict

Returns:

the value for name or a dict of all config values

msnoise.core.config.get_config_categories_definition()

Return ordered display metadata for all workflow categories.

Each entry is a (category_key, display_name, level) tuple where level is the DAG depth (global=0, preprocess/psd=1 — both branch off global, …).

Derived from get_category_display_info() so plugin-added categories appear automatically. The list is ordered for UI rendering (CC branch first, PSD branch at end).

msnoise.core.config.get_config_set_details(session, set_name, set_number, format='list')

Get details of a specific configuration set.

Parameters:
Return type:

list

Returns:

List of config entries in the set

msnoise.core.config.get_config_sets_organized(session)

Get configuration sets organized by category in the standard order

msnoise.core.config.get_merged_params_for_lineage(db, orig_params, step_params, lineage)

Build a MSNoiseParams for the given lineage.

Returns (lineage, lineage_names, MSNoiseParams).

msnoise.core.config.get_params(session)

Build a single-layer MSNoiseParams for global config.

Queries the Config table directly for all (category='global', set_number=1) rows and casts each value to its declared param_type. Does not depend on default.py.

For full pipeline params (per-step config included), use get_merged_params_for_lineage() instead.

Parameters:

session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

Return type:

MSNoiseParams

msnoise.core.config.lineage_to_plot_tag(lineage_names)

Convert a list of step names to a compact plot filename tag.

Each step is abbreviated using _STEP_ABBREVS and the set number is appended. Steps are joined with -.

Examples:

lineage_to_plot_tag(["preprocess_1","cc_1","filter_1","stack_1"])
# → "pre1-cc1-f1-stk1"

lineage_to_plot_tag(["preprocess_1","cc_1","filter_1","stack_1",
                      "refstack_1","mwcs_1","mwcs_dtt_1","mwcs_dtt_dvv_1"])
# → "pre1-cc1-f1-stk1-ref1-mwcs1-dtt1-dvv1"
Parameters:

lineage_names – List of step name strings from MSNoiseResult.lineage_names.

Returns:

Hyphen-joined abbreviated tag string.

msnoise.core.config.list_config_sets(session, set_name=None)

List all configuration sets, optionally filtered by category.

Parameters:
Return type:

list

Returns:

List of tuples (category, set_number, entry_count)

msnoise.core.config.parse_config_key(key)

Parse a dot-notation config key into (category, set_number, name).

Accepted forms:

output_folder            →  ("global", 1, "output_folder")   # global shorthand
cc.cc_sampling_rate      →  ("cc",     1, "cc_sampling_rate")
cc.2.cc_sampling_rate    →  ("cc",     2, "cc_sampling_rate")
mwcs.2.mwcs_wlen         →  ("mwcs",   2, "mwcs_wlen")
Raises:

ValueError – for malformed keys (too many parts, non-integer set number, or bare name that is not a global parameter).

msnoise.core.config.update_config(session, name, value, plugin=None, category='global', set_number=None)

Update one config bit in the database.

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • name (str) – The name of the config bit to set.

  • value (str) – The value of parameter name. Can also be NULL if you don’t want to use this particular parameter.

  • plugin (str) – if provided, gives the name of the Plugin config to use. E.g. if “Amazing” is provided, MSNoise will try to load the “AmazingConfig” entry point. See Extending MSNoise with Plugins for details.

  • category (str) – The config category (default ‘global’). Use e.g. ‘stack’, ‘filter’, ‘mwcs’ to target a specific config set.

  • set_number (int or None) – The config set number within the category (default None for global config). Use e.g. 1 for stack_1.

Workflow & Jobs

MSNoise workflow topology, job management, lineage resolution and scheduling.

msnoise.core.workflow.build_movstack_datelist(session)

Creates a date array for the analyse period. The returned tuple contains a start and an end date, and a list of individual dates between the two.

Parameters:

session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

Return type:

tuple

Returns:

(start, end, datelist)

msnoise.core.workflow.build_ref_datelist(params, session=None)

Creates a date array for the REF. The returned tuple contains a start and an end date, and a list of individual dates between the two.

Parameters:
  • params (AttribDict) – A params object as obtained by get_params().

  • session (sqlalchemy.orm.session.Session, optional) – A Session object, as obtained by connect(). Required only when ref_begin is "1970-01-01" (auto-detect from data availability).

Return type:

tuple

Returns:

(start, end, datelist)

msnoise.core.workflow.compute_rolling_ref(data, ref_begin, ref_end)

Compute a per-index rolling reference from CCF data.

For each time index i, the reference is:

mean(data[i + ref_begin : i + ref_end])

Both ref_begin and ref_end must be negative integers with ref_begin < ref_end <= -1. Use ref_end=-1 to exclude the current window (compare to the previous N windows).

Uses min_periods=1 semantics: the first few steps receive whatever partial window is available rather than NaN.

Return type:

numpy.ndarray Shape (n_times, n_lag_samples). Row i is the reference for time step i.

Create a link between two workflow steps

Create workflow links automatically between existing workflow steps, following the DAG defined by get_workflow_chains().

Linking rule: every step in a source category is linked to every step in each of its successor categories (ALL×ALL). Set numbers carry no topological meaning — all combinations are created and the user removes unwanted links via the admin UI. The DAG is determined entirely by WorkflowLink rows.

Returns:

tuple: (created_count, existing_count, error_message)

msnoise.core.workflow.create_workflow_step(session, step_name, category, set_number, description=None)

Create a new workflow step

msnoise.core.workflow.create_workflow_steps_from_config_sets(session)

Create workflow steps automatically from all existing config sets, sorted by natural workflow order.

Returns:

tuple: (created_count, existing_count, error_message)

msnoise.core.workflow.extend_days(days)

Return a DatetimeIndex from days extended by one extra day at the end.

Replaces the pandas 1.x pattern:

idx = pd.to_datetime(days)
idx = idx.append(pd.DatetimeIndex([idx[-1] + pd.Timedelta("1d")]))

which was removed in pandas 2.0.

Parameters:

days – sequence of date-like values (strings, dates, datetimes…)

Return type:

pandas.DatetimeIndex

msnoise.core.workflow.get_done_lineages_for_category(session, category)

Return all distinct computed lineages for a given workflow step category.

Queries Job rows whose associated WorkflowStep.category matches category and whose flag is 'D' (done), then de-duplicates and resolves each lineage string into an ordered list of step-name strings (upstream → downstream, including the step itself).

This is the correct way to enumerate output folders for a step that may be reached via multiple upstream paths (e.g. multiple filters, multiple MWCS configs), because it reflects what was actually computed rather than what the DAG topology suggests.

Example:

lineages = get_done_lineages_for_category(db, 'stretching')
# → [
#     ['preprocess_1', 'cc_1', 'filter_1', 'stack_1', 'stretching_1'],
#     ['preprocess_1', 'cc_1', 'filter_2', 'stack_1', 'stretching_1'],
# ]
Parameters:

category (str) – Workflow step category, e.g. 'stretching', 'mwcs_dtt', 'wavelet_dtt'.

Return type:

list[list[str]]

Returns:

Sorted list of unique lineage-name lists.

msnoise.core.workflow.get_filter_steps_for_cc_step(session, cc_step_id)

Get all filter steps that are children of a specific CC step.

Parameters:
  • session – Database session

  • cc_step_id – The step_id of the CC step

Returns:

List of filter workflow steps that are successors of the CC step

msnoise.core.workflow.get_job_types(session, jobtype)

Return job counts grouped by flag for a given jobtype string.

Works with the v2 workflow model where jobtype is a step name such as "cc_1". Returns a list of (count, flag) tuples.

Parameters:
Return type:

list of (int, str)

Returns:

List of (count, flag) pairs

msnoise.core.workflow.get_lineages_to_step_id(session, step_id, include_self=True, max_depth=50, max_paths=5000)

Enumerate upstream lineages (all distinct paths) ending at step_id.

Returns a list of paths, each path being a list[WorkflowStep] ordered upstream -> downstream. This preserves branch structure (unlike get_upstream_steps_for_step_id which de-duplicates nodes).

Safety:
  • max_depth prevents infinite loops in case of bad/cyclic graphs

  • max_paths prevents combinatorial explosion

Parameters:
sessionsqlalchemy.orm.session.Session
step_idint
include_selfbool

If True, each path ends with the step itself.

max_depthint
max_pathsint
Returns:
list[list[WorkflowStep]]
msnoise.core.workflow.get_next_job_for_step(session, step_category='preprocess', flag='T', group_by='day', limit_days=None, chunk_size=0)

Return a claimed batch of jobs for a workflow step category.

Note

Most callers should use get_next_lineage_batch() instead, which wraps this function and also resolves lineage strings, params, and station-pair metadata into a ready-to-use batch dict.

group_by:
  • “day”: claim all jobs for the selected (step_id, jobtype, day)

  • “pair”: claim all jobs for the selected (step_id, jobtype, pair)

  • “pair_lineage”: claim all jobs for the selected (step_id, jobtype, pair, lineage)

  • “day_lineage”: claim all jobs for the selected (step_id, jobtype, day, lineage)

chunk_size:

Only applies to group_by="day_lineage". When > 0, at most chunk_size jobs are claimed per batch instead of the full day. Useful for steps with O(N²) work per day (CC) or large per-day station counts (PSD) so that multiple workers can share the same day in parallel without write conflicts.

chunk_size=0 (default) claims everything — identical to the original behaviour.

msnoise.core.workflow.get_next_lineage_batch(db, step_category, group_by='pair_lineage', loglevel='INFO', day_value=None, chunk_size=0)

Standard worker prolog for lineage-aware steps.

  • Claims the next batch of jobs for step_category using get_next_job_for_step.

  • Extracts (pair, lineage_str, refs, days).

  • Loads current step config (from the job row).

  • Resolves lineage_str -> WorkflowStep objects.

  • Builds a MSNoiseParams for that lineage (one layer per category).

Parameters:
chunk_sizeint, optional

Passed to get_next_job_for_step(). Only effective for group_by="day_lineage". When > 0, at most chunk_size jobs are claimed per batch, enabling multiple workers to share the same day in parallel. Default 0 = claim everything (original behaviour).

Returns:
dict with keys:
  • jobs, step

  • pair, lineage_str, lineage_steps

  • lineage_names — full list including current step name

  • lineage_names_upstream — full minus current step (replaces manual [:-1])

  • lineage_names_mov — upstream with any refstack_* entries stripped

    (used by mwcs/wct/stretching to find MOV CCFs)

  • refs, days

  • step_params, params — params is a MSNoiseParams instance

or None if no jobs were claimed (caller should continue/sleep).
msnoise.core.workflow.get_refstack_lineage_for_filter(session, filterid, refstack_set_number=1)

Get the full lineage path from root through a filter step to its refstack.

Refstack is now a sibling of stack (both children of the filter pass-through). Returns the path ending at refstack_M directly downstream of filter_{filterid} — suitable for xr_get_ref().

Example for the default single-pipeline:

get_refstack_lineage_for_filter(db, 1)
# → ['preprocess_1', 'cc_1', 'filter_1', 'refstack_1']
Parameters:
  • filterid (int) – The filter set_number (e.g. 1 for filter_1).

  • refstack_set_number (int) – Which refstack set to use (default 1).

Return type:

list of str

msnoise.core.workflow.get_t_axis(params)

Returns the time axis (in seconds) of the CC functions.

Return type:

numpy.array

Returns:

the time axis in seconds

msnoise.core.workflow.get_workflow_job_counts(db)

Get job counts by status for the workflow

Parameters:

db – Database connection

Returns:

Dictionary with job counts by status

Get all links in a workflow

msnoise.core.workflow.get_workflow_steps(session)

Get all steps in a workflow

msnoise.core.workflow.is_next_job_for_step(session, step_category='preprocess', flag='T')

Are there any workflow-aware Jobs in the database with the specified flag and step category?

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • step_category (str) – Workflow step category (e.g., “preprocess”, “qc”)

  • flag (str) – Status of the Job: “T”odo, “I”n Progress, “D”one.

Return type:

bool

Returns:

True if at least one Job matches the criteria, False otherwise.

msnoise.core.workflow.lineage_str_to_step_names(lineage_str, sep='/')

Convert a lineage string like “preprocess_1/cc_1/filter_1” into a list of step_name strings, preserving order.

msnoise.core.workflow.lineage_str_to_steps(session, lineage_str, sep='/', strict=True)

Resolve a lineage string to a list of WorkflowStep ORM objects (ordered).

Parameters:
sessionsqlalchemy.orm.session.Session
lineage_strstr

e.g. “preprocess_1/cc_1/filter_1”

sepstr

Separator used in lineage strings.

strictbool

If True, raises if any step_name can’t be resolved. If False, silently skips missing steps.

Returns:
list[WorkflowStep]
msnoise.core.workflow.massive_insert_job(session, jobs)

Bulk-insert a list of job dicts into the jobs table.

Each dict must contain at minimum day, pair, jobtype, flag and lastmod keys. Optional keys: step_id, priority, lineage.

Parameters:
msnoise.core.workflow.massive_update_job(session, jobs, flag='D')

Routine to use a low level function to update much faster a list of Job. This method uses the Job.ref which is unique.

Note: If session becomes invalid, will attempt to get a fresh session from the engine.

Parameters:
  • session (Session) – the database connection object

  • jobs (list or tuple) – a list of Job to update.

  • flag (str) – The destination flag.

msnoise.core.workflow.propagate_downstream(session, batch: dict) int

Propagate a just-completed batch to all immediate downstream worker steps.

Called immediately after massive_update_job() marks a batch Done, only when params.global_.hpc is False (the default). In HPC mode the operator runs msnoise new_jobs --after <category> manually.

Delegates to the specialised propagate_* functions from s02_new_jobs for transitions that require non-trivial logic (fan-outs, sentinel jobs, etc.). For simple pair-preserving transitions it performs a targeted bulk upsert keyed on the batch’s exact (pair, day) tuples.

Parameters:
sessionsqlalchemy.orm.Session
batchdict

Return value of get_next_lineage_batch().

Returns:
int

Total jobs created or bumped to T.

msnoise.core.workflow.refstack_is_rolling(params)

Return True if the refstack configset uses rolling-index mode.

Rolling mode is indicated by ref_begin being a negative integer string (e.g. "-5"). In this mode no REF file is written to disk; the reference is computed on-the-fly at MWCS/stretching/WCT time via compute_rolling_ref().

Parameters:

params (obspy.core.AttribDict) – Merged parameter set containing ref_begin.

Return type:

bool

msnoise.core.workflow.refstack_needs_recompute(session, pair, cc_lineage_prefix, params)

Return True if the REF stack needs to be recomputed for this pair.

Queries Done stack jobs for pair across all active stack_N steps whose lineage starts with cc_lineage_prefix (e.g. ["preprocess_1", "cc_1", "filter_1"]), and checks whether any date falls inside the configured [ref_begin, ref_end] window.

Stack and refstack are now siblings (both children of filter/cc), so the refstack worker no longer has a stack step in its upstream lineage. Instead, the cc/filter prefix is shared by both branches and is used here to locate the sibling stack jobs.

Should only be called for Mode A (fixed-date) refstack jobs.

Parameters:
  • session – SQLAlchemy session.

  • pair – Station pair string "NET.STA.LOC:NET.STA.LOC".

  • cc_lineage_prefix – Lineage name list up to (but not including) the stack/refstack level, e.g. ["preprocess_1", "cc_1", "filter_1"]. Pass batch["lineage_names_upstream"] from the refstack worker — that list ends at filter_N (the pass-through above refstack).

  • paramsMSNoiseParams for this lineage.

Return type:

bool

msnoise.core.workflow.reset_jobs(session, jobtype, alljobs=False, reset_i=True, reset_e=True, downstream=False)

Reset jobs back to ``”T”``odo.

jobtype can be:

  • A specific step name, e.g. "cc_1" — resets only that step.

  • A category name, e.g. "cc" — resets all active steps in that category (cc_1, cc_2, …).

If downstream is True, all steps reachable via WorkflowLink from the resolved step(s) are also reset (breadth-first, respects is_active).

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object

  • jobtype (str) – Step name or category name to reset.

  • alljobs (bool) – If True reset all jobs regardless of current flag; otherwise only resets "I" and/or "F" flagged jobs.

  • reset_i (bool) – Reset ``”I”``n-progress jobs (default True).

  • reset_e (bool) – Reset ``”E”``rror/failed jobs (default True).

  • downstream (bool) – Also reset every step reachable downstream via active WorkflowLink rows (default False).

msnoise.core.workflow.resolve_lineage_params(session, lineage_names)

Resolve a lineage name-list into a fully merged params object.

Given a list of step-name strings (as returned by get_done_lineages_for_category()), resolves them to WorkflowStep ORM objects and merges every step’s configuration into the global params, exactly as the processing steps themselves do via get_next_lineage_batch().

Returns (lineage_steps, lineage_names, params) — the same tuple as get_merged_params_for_lineage() — so callers can use params directly (it will have components_to_compute, mov_stack, etc.).

Example:

lineage_names = get_done_lineages_for_category(db, 'mwcs_dtt')[0]
_, _, params = resolve_lineage_params(db, lineage_names)
mov_stack = params.stack.mov_stack[0]
Parameters:

lineage_names – Ordered list of step-name strings, e.g. ['preprocess_1', 'cc_1', 'filter_1', 'stack_1', 'mwcs_1', 'mwcs_dtt_1'].

Return type:

tuple(list, list[str], MSNoiseParams)

msnoise.core.workflow.update_job(session, day, pair, jobtype, flag, step_id=None, priority=0, lineage=None, commit=True, returnjob=True, ref=None)

Updates or Inserts a Job in the database. Workflow-aware: handles step_id and lineage fields.

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • day (str) – The day in YYYY-MM-DD format

  • pair (str) – the name of the station pair

  • jobtype (str) – Job type string, e.g. "preprocess_1"

  • flag (str) – Status of the Job: “T”odo, “I”n Progress, “D”one, “F”ailed.

  • step_id (int or None) – WorkflowStep primary key, or None

  • priority (int) – Job priority (default 0)

  • lineage (str or None) – Lineage string encoding upstream configset chain

  • commit (bool) – Whether to directly commit (True, default) or not (False)

  • returnjob (bool) – Return the modified/inserted Job (True, default) or not (False)

  • ref (int or None) – If provided, look up the job by its primary key instead of (day, pair, jobtype, lineage).

Return type:

Job or None

Returns:

If returnjob is True, returns the modified/inserted Job.

msnoise.core.workflow.filter_within_daterange(date, start_date, end_date)

Check if a date falls within the configured range

msnoise.core.workflow.get_first_runnable_steps_per_branch(session, source_step_id, skip_categories=None)

Return the first runnable step on each outgoing branch from source_step_id, skipping steps in skip_categories (default: {“filter”}).

  • “Branch” roots are the immediate successors of source_step_id.

  • If skipped nodes fan out, each fan-out is treated as a separate branch.

  • Results are deduped (set behavior) and returned in stable order.

msnoise.core.workflow.get_step_successors(session, step_id)

Get all steps that this step feeds into

msnoise.core.workflow.get_workflow_graph(session)

Return workflow as nodes and edges for visualization, sorted by workflow order

msnoise.core.workflow.get_workflow_chains()

Return the full workflow adjacency map, including any plugin extensions.

Merges the built-in topology with addenda registered via the msnoise.plugins.workflow_chains entry point group. Each registered callable must return a dict of the same schema as the built-in chains:

{
    "my_step": {
        "next_steps": ["other_step"],
        "is_entry_point": False,
        "is_terminal": False,
    }
}

Plugin entries are merged in load order; later plugins can override earlier ones for the same category key.

Return type:

dict[str, dict]

msnoise.core.workflow.get_workflow_order()

Return the canonical category display order, including plugin extensions.

Merges the built-in order with any extra categories registered via the msnoise.plugins.workflow_order entry point group. Each registered callable must return a list of category name strings in the desired insertion order (appended after the built-in list).

Return type:

list[str]

msnoise.core.workflow.get_step_abbrevs()

Return a mapping of workflow category → short plot-filename abbreviation.

Merges the built-in abbreviations (from _BUILTIN_WORKFLOW_CHAINS) with any abbrev keys provided by plugin entry points via msnoise.plugins.workflow_chains. Falls back to the raw category name for any category without an abbrev entry.

Return type:

dict[str, str]

msnoise.core.workflow.get_category_display_info()

Return ordered display metadata for all workflow categories.

Each entry is a dict with keys category, display_name, level (DAG depth: global=0, preprocess/psd=1, cc=2, …). Ordered for UI rendering: the main CC branch first (preprocess→cc→…→dvv), then the PSD branch (psd→psd_rms). Plugin categories are appended at the end.

Return type:

list[dict]

msnoise.core.workflow.is_terminal_category(category)

Return True if category is a terminal workflow step (no successors).

Terminal categories are those with an empty next_steps list in get_workflow_chains(). Examples: psd_rms, mwcs_dtt_dvv.

Parameters:

category – Workflow category string.

Return type:

bool

msnoise.core.workflow.is_entry_category(category)

Return True if category is a DAG entry point (no predecessors).

Entry categories are those with is_entry_point: True in get_workflow_chains(). Currently only global.

Parameters:

category – Workflow category string.

Return type:

bool

msnoise.core.workflow.query_active_steps(session, categories)

Return all active WorkflowStep rows whose category is in categories.

Parameters:

categories – A single category string or an iterable of strings.

Return type:

list[WorkflowStep]

msnoise.core.workflow.resolve_lineage_ids(session, lids)

Batch-resolve a set of lineage_ids to their lineage strings.

Parameters:

lids – Iterable of integer lineage_ids (None values are skipped).

Returns:

{lineage_id: lineage_str} for every id that exists in the DB.

Return type:

dict[int, str]

msnoise.core.workflow.bulk_upsert_jobs(session, to_insert, to_bump_refs, now, *, bump_flag_filter=None)

Bulk-insert new jobs and bump existing ones back to "T".

Parameters:
  • to_insert – List of job dicts (keys: day, pair, jobtype, step_id, priority, flag, lastmod, lineage_id).

  • to_bump_refs – List of Job.ref primary-key values to reset to T.

  • nowdatetime used for lastmod on bumped rows.

  • bump_flag_filter – If given, only rows whose current flag equals this value are bumped. Pass None to bump unconditionally.

Returns:

(n_inserted, n_bumped)

Return type:

tuple[int, int]

msnoise.core.workflow.get_category_cli_command(category: str) list | None

Return the msnoise sub-command token list for category, or None.

Reads the "cli" key from get_workflow_chains() so that the mapping stays in the single source of truth. None means the category is a pass-through (no worker, no jobs).

Plugin packages extend the mapping by adding a "cli" key to the dict returned by their msnoise.plugins.workflow_chains entry point — no separate entry point is needed.

Parameters:

category – Workflow category string (e.g. "cc", "stack").

Return type:

list[str] or None

msnoise.core.workflow.run_workflow_plan(session, *, threads: int = 1, hpc: bool | None = None, from_category: str | None = None, until_category: str | None = None, chunk_size: int = 0) list

Build an ordered execution plan for all active workflow steps.

Performs a topological sort of active WorkflowStep categories via WorkflowLink rows and returns an ordered list of RunStep namedtuples. Pass-through categories (filter, global) are omitted. Categories with no runnable command (see get_category_cli_command()) are also omitted.

The returned plan is pure data — no subprocesses are started. The CLI command msnoise utils run_workflow drives the actual execution.

Parameters:
  • session – SQLAlchemy session.

  • threads – Number of parallel worker threads/processes (-t N).

  • hpc – Override hpc flag from config. If None, reads from params.global_.hpc.

  • from_category – Skip all categories before this one (inclusive start).

  • until_category – Stop after this category (inclusive end).

  • chunk_size--chunk-size value passed to cc compute and qc compute_psd.

Returns:

Ordered list of RunStep namedtuples.

Return type:

list[RunStep]

I/O

MSNoise xarray I/O for all result types (CCF, MWCS, DTT, STR, WCT, DVV, PSD).

msnoise.core.io.aggregate_dvv_pairs(root, parent_lineage, parent_step_name, parent_category: str, mov_stack, component: str, pair_type: str, pairs, params) Dataset

Aggregate per-pair DTT/STR/WCT-DTT results into network-level dv/v statistics.

Reads all per-pair output files for the given (mov_stack, component) combination, extracts the appropriate dv/v and error columns per pair type, applies quality filtering, then computes across-pair statistics at each time step.

Parameters:
  • root – Output folder root.

  • parent_lineage – Lineage name list up to and including the parent DTT step (e.g. ["preprocess_1", ..., "mwcs_dtt_1"]).

  • parent_step_name – Step name of the parent DTT step.

  • parent_category"mwcs_dtt", "stretching", or "wavelet_dtt".

  • mov_stack – Tuple (window, step) e.g. ("1D", "1D").

  • component – Component string e.g. "ZZ".

  • pair_type"CC", "SC", "AC", or "ALL".

  • pairs – Iterable of (sta1, sta2) SEED-id string tuples.

  • params – Params object from get_params().

Returns:

xarray.Dataset with dim times and stat variables.

Raises:

ValueError – if no data files are found for the given combination.

msnoise.core.io.psd_ppsd_to_dataframe(ppsd)
msnoise.core.io.psd_ppsd_to_dataset(ppsd)

Convert an ObsPy PPSD object to an xarray.Dataset.

Builds the same data as psd_ppsd_to_dataframe() but returns an xr.Dataset with a PSD variable of dims (times, periods) — ready to pass directly to xr_save_psd() without a DataFrame round-trip.

Parameters:

ppsdPPSD object.

Returns:

xarray.Dataset.

msnoise.core.io.psd_read_results(net, sta, loc, chan, datelist, format='PPSD', use_cache=True)
msnoise.core.io.save_daily_ccf(root, lineage, step_name, station1, station2, components, date, corr, taxis)

Save a daily-stacked CCF to NetCDF using xr_save_ccf_daily().

Replaces the legacy add_corr() / _export_mseed / _export_sac pipeline. Output lives at:

<root>/<lineage>/<step_name>/_output/daily/<components>/<sta1>_<sta2>/<date>.nc
Parameters:
  • root – Output folder (params.global_.output_folder).

  • lineage – List of step-name strings for the lineage path.

  • step_name – Current step name (e.g. "cc_1").

  • station1 – First station SEED id NET.STA.LOC.

  • station2 – Second station SEED id NET.STA.LOC.

  • components – Component pair string e.g. "ZZ".

  • date – Calendar date (datetime.date or ISO string).

  • corr – 1-D numpy array, the stacked CCF.

  • taxis – 1-D lag-time axis array.

msnoise.core.io.xr_get_ccf(root, lineage, station1, station2, components, mov_stack, taxis)

Load CCF results from a NetCDF file.

Returns:

xarray.DataArray with dims (times, taxis), lazily memory-mapped. Call .load() or .values when you need numpy.

Raises:

FileNotFoundError – if the NetCDF file does not exist.

Note

xr_get_ccf (readers: s05/s08/s10) and xr_save_ccf (writer: s04) operate on strictly disjoint workflow steps — the file is never read and written concurrently, so keeping the handle lazy is safe.

msnoise.core.io.xr_get_ccf_all(root, lineage, step_name, station1, station2, components, date)

Load all windowed CCFs for one day written by xr_save_ccf_all().

Returns:

xarray.DataArray with dims (times, taxis).

Raises:

FileNotFoundError – if the NetCDF file does not exist.

msnoise.core.io.xr_get_ccf_daily(root, lineage, step_name, station1, station2, components, date)

Load a single daily-stacked CCF written by xr_save_ccf_daily().

Returns:

xarray.DataArray with dim taxis.

Raises:

FileNotFoundError – if the NetCDF file does not exist.

msnoise.core.io.xr_get_dtt(root, lineage, station1, station2, components, mov_stack)

Load DTT results from a NetCDF file.

Returns:

xarray.Dataset with a DTT variable and dims (times, keys).

Raises:

FileNotFoundError – if the NetCDF file does not exist.

msnoise.core.io.xr_get_dvv_agg(root, lineage, step_name, mov_stack, pair_type: str, component: str)

Load a DVV aggregate result from a NetCDF file.

Returns:

xarray.Dataset (lazy).

Raises:

FileNotFoundError – if the file does not exist.

msnoise.core.io.xr_get_dvv_pairs(root, lineage, step_name, mov_stack, pair_type: str, component: str)

Load per-pair dv/v time series from a NetCDF file.

Returns:

xarray.Dataset with dims (pair, times) and variables dvv and err.

Raises:

FileNotFoundError – if the file does not exist.

msnoise.core.io.xr_get_mwcs(root, lineage, station1, station2, components, mov_stack)

Load MWCS results from a NetCDF file.

Returns:

xarray.Dataset with a MWCS variable and dims (times, taxis, keys).

Raises:

FileNotFoundError – if the NetCDF file does not exist.

msnoise.core.io.xr_get_ref(root, lineage, station1, station2, components, taxis, ignore_network=False)
msnoise.core.io.xr_get_wct_dtt(root, lineage, station1, station2, components, mov_stack)

Load per-pair WCT dt/t results from a NetCDF file.

Returns an xarray.Dataset with variables DTT, ERR, COH each of shape (times, frequency), as written by xr_save_wct_dtt().

Raises:

FileNotFoundError – if the NetCDF file does not exist.

Return type:

xarray.Dataset

msnoise.core.io.xr_load_ccf_for_stack(root, lineage_names, station1, station2, components, dates)

Load per-day CCF data for stacking — the higher-level replacement for get_results_all.

Reads the daily CCF NetCDF files written by s03_compute_no_rotation() for the requested station pair, component and date list. Tries keep_all (per-window, _output/all/) first; falls back to keep_days (daily stacks, _output/daily/) if the former are absent.

Path layout (written by s03):

<root> / *cc_lineage / filter_step / _output / all|daily / <comp> / <sta1>_<sta2> / <date>.nc

Data are stored internally as float32 to halve the working-set size relative to the float64 values written by s03. Precision loss is negligible for the rolling-mean operations performed by the stack worker.

Parameters:
  • root – Output folder (params.global_.output_folder).

  • lineage_names – Full lineage name list including the filter step, e.g. ["preprocess_1", "cc_1", "filter_1", "stack_1"].

  • station1 – First station SEED id NET.STA.LOC.

  • station2 – Second station SEED id NET.STA.LOC.

  • components – Component pair string e.g. "ZZ".

  • dates – Iterable of datetime.date or ISO date strings.

Returns:

xarray.Dataset with variable CCF and dims (times, taxis), sorted by times. Empty Dataset if no data found.

msnoise.core.io.xr_load_psd(root, lineage, step_name, seed_id, day)

Load a daily PSD NetCDF written by xr_save_psd().

Parameters:
formatstr

"dataframe" returns a DataFrame or None if file not found. "dataset" (default) returns an xarray.Dataset or None.

msnoise.core.io.xr_load_rms(root, lineage, step_name, seed_id)

Load per-station PSD RMS results from a NetCDF file.

Parameters:
formatstr

"dataframe" returns a DataFrame or None if file not found. "dataset" (default) returns an xarray.Dataset or None.

msnoise.core.io.xr_load_wct(root, lineage, station1, station2, components, mov_stack)

Load WCT results from a NetCDF file.

Returns:

xarray.Dataset with variables WXamp, Wcoh, WXdt and dims (times, freqs, taxis).

Raises:

FileNotFoundError – if the NetCDF file does not exist.

Pure read — xr_save_wct_dtt() writes to a different path, so keeping this handle lazy is safe.

msnoise.core.io.xr_save_ccf(root, lineage, step_name, station1, station2, components, mov_stack, taxis, new, overwrite=False)
msnoise.core.io.xr_save_ccf_all(root, lineage, step_name, station1, station2, components, date, window_times, taxis, corrs)

Save all windowed CCFs for one day as a single NetCDF file.

Replaces the legacy HDF5-based export_allcorr. One file per calendar day — safe for concurrent workers since each worker owns exactly one day.

Path layout:

<root>/<lineage>/<step_name>/_output/all/<components>/<sta1>_<sta2>/<date>.nc
Parameters:
  • window_times – 1-D array of window start times (datetime strings or datetime objects) — the times dimension.

  • taxis – 1-D lag-time axis array — the taxis dimension.

  • corrs – 2-D numpy array (n_windows, n_taxis) of CCF data.

msnoise.core.io.xr_save_ccf_daily(root, lineage, step_name, station1, station2, components, date, taxis, corr)

Save a single daily-stacked CCF as a per-day NetCDF file.

One file per calendar day — safe for concurrent workers since each worker owns exactly one day.

Path layout:

<root>/<lineage>/<step_name>/_output/daily/<components>/<sta1>_<sta2>/<YYYY-MM-DD>.nc
Parameters:
  • corr – 1-D numpy array of the stacked CCF.

  • taxis – 1-D lag-time axis array.

  • datedatetime.date or ISO string "YYYY-MM-DD".

msnoise.core.io.xr_save_dtt(root, lineage, step_name, station1, station2, components, mov_stack, dataset)

Save DTT results to a NetCDF file.

Parameters:

datasetxarray.Dataset with a DTT variable and dims (times, keys), as built by s06_compute_mwcs_dtt.

msnoise.core.io.xr_save_dvv_agg(root, lineage, step_name, mov_stack, pair_type: str, component: str, dataset)

Save a DVV aggregate result to a NetCDF file.

Path layout:

<root>/<lineage>/<step_name>/_output/<mov_stack>/dvv_<pair_type>_<component>.nc
Parameters:

datasetxarray.Dataset with dim times and stat variables as built by aggregate_dvv_pairs().

msnoise.core.io.xr_save_dvv_pairs(root, lineage, step_name, mov_stack, pair_type: str, component: str, dataset)

Save per-pair dv/v time series to a NetCDF file.

Path layout:

<root>/<lineage>/<step_name>/_output/<mov_stack>/dvv_pairs_<pair_type>_<component>.nc
Parameters:

datasetxarray.Dataset with dims (pair, times) and variables dvv and err, as built by aggregate_dvv_pairs().

msnoise.core.io.xr_save_mwcs(root, lineage, step_name, station1, station2, components, mov_stack, taxis, dataset)

Save MWCS results to a NetCDF file.

Parameters:

datasetxarray.Dataset with a MWCS variable and dims (times, taxis, keys), as built by s05_compute_mwcs.

msnoise.core.io.xr_save_psd(root, lineage, step_name, seed_id, day, dataset)

Save a daily PSD result to a NetCDF file.

Path layout:

<root>/<lineage>/<step_name>/_output/daily/<seed_id>/<YYYY-MM-DD>.nc
Parameters:

datasetxarray.Dataset with a PSD variable and dims (times, periods), as built by psd_ppsd_to_dataset().

msnoise.core.io.xr_save_ref(root, lineage, step_name, station1, station2, components, taxis, new, overwrite=False)
msnoise.core.io.xr_save_rms(root, lineage, step_name, seed_id, dataframe)

Save per-station PSD RMS results to a NetCDF file.

Accepts either a DataFrame (legacy, index = DatetimeIndex, columns = band labels) or an xarray.Dataset with a RMS variable and dims (times, bands).

msnoise.core.io.xr_save_stretching(root, lineage, step_name, station1, station2, components, mov_stack, dataset)

Save per-pair stretching results to a NetCDF file.

Path layout:

<root>/<lineage>/<step_name>/_output/<mov_stack[0]>_<mov_stack[1]>/<components>/<sta1>_<sta2>.nc
Parameters:

datasetxarray.Dataset with a STR variable and dims (times, keys) where keys = ['Delta', 'Coeff', 'Error'], as built by s10_stretching.

msnoise.core.io.xr_save_wct(root, lineage, step_name, station1, station2, components, mov_stack, dataset)

Save WCT results to a NetCDF file.

Parameters:

datasetxarray.Dataset with variables WXamp, Wcoh, WXdt and dims (times, freqs, taxis), as built by s08_compute_wct.

msnoise.core.io.xr_save_wct_dtt(root, lineage, step_name, station1, station2, components, mov_stack, taxis, dataset)

Save WCT-DTT results to a NetCDF file.

Parameters:

datasetxarray.Dataset with variables DTT, ERR, COH and dims (times, frequency), as built by s09_compute_wct_dtt.

Signal Processing

MSNoise signal processing, preprocessing helpers, and stacking utilities.

msnoise.core.signal.check_and_phase_shift(trace, taper_length=20.0)
msnoise.core.signal.compute_wct_dtt(freqs, tvec, WXamp, Wcoh, delta_t, lag_min=5, coda_cycles=20, mincoh=0.5, maxdt=0.2, min_nonzero=0.25, freqmin=0.1, freqmax=2.0)

Compute dv/v and associated errors from WCT results following Mao et al.[1].

For each frequency band, fits a weighted linear regression delta_t(t) = -(dv/v) * t using log-amplitude weights from the cross-wavelet spectrum (their eq. 3–4).

Parameters:
  • freqs – Frequency values from the WCT.

  • tvec – Time axis.

  • WXamp – Cross-wavelet amplitude array (freqs × taxis).

  • Wcoh – Wavelet coherence array (freqs × taxis).

  • delta_t – Time delay array (freqs × taxis).

  • lag_min – Minimum coda lag in seconds.

  • coda_cycles – Number of periods to use as coda window width.

  • mincoh – Minimum coherence threshold.

  • maxdt – Maximum allowed time delay.

  • min_nonzero – Minimum fraction of valid (non-zero weight) samples required.

  • freqmin – Lower frequency bound for regression.

  • freqmax – Upper frequency bound for regression.

Returns:

Tuple of (dt/t, err, weighting_function).

msnoise.core.signal.find_segments(data, gap_threshold)

Identify continuous non-NaN segments in an xarray DataArray.

Parameters:
  • data – 2-D xarray DataArray (times × lags).

  • gap_threshold – Maximum index gap before treating as a new segment.

Returns:

List of lists of row indices forming each continuous segment.

msnoise.core.signal.getCoherence(dcs, ds1, ds2)

Compute cross-coherence between two spectra.

Parameters:
  • dcs – Cross-spectrum magnitudes (1-D array, length n).

  • ds1 – Auto-spectrum of signal 1 (1-D array, length n).

  • ds2 – Auto-spectrum of signal 2 (1-D array, length n).

Returns:

Complex coherence array of length n, clipped to |coh| <= 1.

msnoise.core.signal.getGaps(stream, min_gap=None, max_gap=None)
msnoise.core.signal.get_preprocessed_stream(output_dir, step_name, goal_day, stations)

Read per-station preprocessed files and return a merged Stream.

Counterpart to save_preprocessed_streams(). Reads only the station files needed for stations (a list of "NET.STA.LOC" strings) and returns them merged into a single Stream.

Missing station files are silently skipped (the cc worker checks for empty streams downstream).

Parameters:
  • output_dir – Base output directory.

  • step_name – Workflow step name (e.g. "preprocess_1").

  • goal_day – Processing date string (YYYY-MM-DD).

  • stations – List of "NET.STA.LOC" strings to load.

Returns:

Stream.

msnoise.core.signal.get_wavelet_type(wavelet_type)

Return an internal wavelet object for the given type/parameter pair.

Parameters:

wavelet_type – Tuple (name, param) or (name,). Supported names: "Morlet", "Paul", "DOG", "MexicanHat".

Returns:

Corresponding wavelet instance (_Morlet, _Paul, _DOG, _MexicanHat).

msnoise.core.signal.get_wct_avgcoh(freqs, tvec, wcoh, freqmin, freqmax, lag_min=5, coda_cycles=20)

Calculate average wavelet coherence over a frequency range and coda window.

Parameters:
  • freqs – Frequency array.

  • tvec – Time axis.

  • wcoh – Wavelet coherence array (freqs × taxis).

  • freqmin – Lower frequency bound.

  • freqmax – Upper frequency bound.

  • lag_min – Minimum coda lag in seconds.

  • coda_cycles – Number of periods to use as coda window width.

Returns:

Average coherence per frequency bin within [freqmin, freqmax].

msnoise.core.signal.get_window(window='boxcar', half_win=3)

Return a normalised complex smoothing window for MWCS processing.

Parameters:
  • window"boxcar" (default) or "hanning".

  • half_win – Half-width in samples (full window = 2*half_win+1).

Returns:

Complex numpy array of length 2*half_win+1, sum-normalised.

msnoise.core.signal.make_same_length(st)

This function takes a stream of equal sampling rate and makes sure that all channels have the same length and the same gaps.

msnoise.core.signal.nextpow2(x)

Returns the next power of 2 of x.

Parameters:

x (int) – any value

Return type:

int

Returns:

the next power of 2 of x

msnoise.core.signal.preload_instrument_responses(session, return_format='dataframe')

This function preloads all instrument responses from response_path and stores the seed ids, start and end dates, and paz for every channel in a DataFrame. Any file readable by obspy’s read_inventory will be processed.

Parameters:
Return type:

DataFrame or Inventory

Returns:

A table containing all channels with the time of operation and poles and zeros (DataFrame), or an obspy Inventory object.

msnoise.core.signal.prepare_abs_positive_fft(line, sampling_rate)

Return the positive-frequency part of the absolute FFT of line.

Parameters:
  • line – 1-D signal array.

  • sampling_rate – Sampling rate in Hz.

Returns:

(freq, val) - positive-frequency vector and absolute FFT values.

msnoise.core.signal.psd_df_rms(d, freqs, output='VEL')

Compute per-frequency-band RMS from PSD data.

Parameters:
  • dxarray.Dataset with a PSD variable and dims (times, periods), as returned by xr_load_psd().

  • freqs – List of (fmin, fmax) tuples defining frequency bands.

  • output – Physical unit - "VEL" (default), "ACC", or "DISP".

Returns:

xarray.Dataset with one RMS variable and dims (times, bands), ready for xr_save_rms().

msnoise.core.signal.psd_rms(s, f)

Compute RMS from a power spectrum array and frequency array.

Parameters:
  • s – Power spectral density values (1-D array).

  • f – Frequency values (1-D array, same length as s).

Returns:

Float - square-root of the integrated power.

msnoise.core.signal.save_preprocessed_streams(stream, output_dir, step_name, goal_day)

Write preprocessed traces to per-station files.

Output layout:

<output_dir>/<step_name>/_output/<goal_day>/<NET.STA.LOC>.mseed

One file per station (all channels for that station in the same file). This is concurrency-safe: each station writes its own file with no shared state between parallel workers.

Parameters:
  • streamStream to write.

  • output_dir – Base output directory (params.global_.output_folder).

  • step_name – Workflow step name (e.g. "preprocess_1").

  • goal_day – Processing date string (YYYY-MM-DD).

Returns:

List of written file paths (one per station).

msnoise.core.signal.smoothCFS(cfs, scales, dt, ns, nt)

Smooth CWT coefficients along both time and scale axes.

Parameters:
  • cfs – CWT coefficient array, shape (n_scales, n_times).

  • scales – 1-D array of wavelet scales.

  • dt – Sampling interval in seconds.

  • ns – Length of the moving-average filter across scales.

  • nt – Gaussian width parameter along time.

Returns:

Smoothed coefficient array, same shape as cfs.

msnoise.core.signal.stack(data, stack_method='linear', pws_timegate=10.0, pws_power=2, goal_sampling_rate=20.0, freqmin=1.0, freqmax=10.0, tfpws_nscales=20)

Stack an array of CCF traces into a single representative trace.

Three methods are available, selected via stack_method:

Linear stack ("linear")

The arithmetic mean across all input traces:

\[s(t) = \frac{1}{N} \sum_{j=1}^{N} d_j(t)\]

Incoherent noise cancels as \(1/\sqrt{N}\). Fastest and most transparent, but offers no protection against high-amplitude transients (earthquakes, instrumental glitches) that survive pre-processing.

Phase-weighted stack ("pws")

Introduced by Schimmel & Paulssen (1997) [2]. Each sample is weighted by the instantaneous phase coherence \(c(t)\) of the analytic signal across all traces:

\[c(t) = \frac{1}{N} \left| \sum_{j=1}^{N} e^{i\,\phi_j(t)} \right|, \qquad c(t) \in [0, 1]\]

where \(\phi_j(t) = \arg\bigl(d_j(t) + i\,\mathcal{H}\{d_j\}(t)\bigr)\) is the instantaneous phase of trace j obtained via the Hilbert transform \(\mathcal{H}\). The coherence is smoothed with a boxcar window of pws_timegate seconds before being raised to the power v = pws_power:

\[s(t) = \frac{1}{N} \sum_{j=1}^{N} d_j(t) \cdot c(t)^v\]

High-amplitude incoherent transients have random phases across traces, so \(c(t) \approx 0\) at those times; coherent arrivals have \(c(t) \approx 1\) and are preserved.

Time-frequency phase-weighted stack ("tfpws")

The TF extension of PWS by Schimmel & Gallart (2007) [3]. Phase coherence is computed in the time-frequency domain via a continuous wavelet transform (CWT) with a complex Morlet wavelet, giving a coherence map \(c(a, t)\) that is both scale- and time-dependent. Averaging over the nscales log-spaced scales spanning [freqmin, freqmax] Hz yields a single per-lag weight:

\[W_j(a, t) = \mathcal{W}\{d_j\}(a, t)\]
\[c(a, t) = \frac{1}{N} \left| \sum_{j=1}^{N} e^{i\,\arg W_j(a,t)} \right|\]
\[w(t) = \left[ \frac{1}{A} \sum_{a} c(a, t) \right]^v, \qquad s(t) = \frac{1}{N} \sum_{j=1}^{N} d_j(t) \cdot w(t)\]

where \(A\) is the number of scales. Because coherence is evaluated independently at each scale, tf-PWS is more sensitive to narrow-band coherent arrivals than time-domain PWS. It is particularly effective for noise autocorrelations where body-wave reflections occupy a limited frequency band (Romero & Schimmel 2018 [4]).

Note

Memory scales as \(O(N \times A \times T)\) complex128. For long CCFs or large archives consider reducing nscales (default 20) or chunking pairs outside this function.

Parameters:
  • data (numpy.ndarray) – 2-D array of shape (N_traces, N_lags), each row one CCF.

  • stack_method (str) – "linear", "pws", or "tfpws".

  • pws_timegate (float) – Boxcar smoothing window for the PWS coherence estimate, in seconds ("pws" only). Default 10 s.

  • pws_power (float) – Exponent v applied to the coherence weight. Larger values increase selectivity. Shared by "pws" and "tfpws". Default 2.

  • goal_sampling_rate (float) – Sampling rate of the CCF array (Hz).

  • freqmin (float) – Lower frequency bound (Hz) for the CWT scale range — "tfpws" only. Should match the parent filter’s freqmin.

  • freqmax (float) – Upper frequency bound (Hz) for the CWT scale range — "tfpws" only. Should match the parent filter’s freqmax.

  • tfpws_nscales (int) – Number of log-spaced CWT scales between freqmin and freqmax"tfpws" only. Default 20.

Return type:

numpy.ndarray

Returns:

1-D stacked CCF of length N_lags, or [] if no valid (non-NaN) traces are present.

msnoise.core.signal.validate_stack_data(dataset, stack_type='reference')

Validates stack data before processing

Parameters:

dataset: xarray Dataset to validate stack_type: Type of stack (“reference” or “moving”) for error messages

Returns:

(is_valid, message) tuple

msnoise.core.signal.wiener_filt(data, M, N, gap_threshold)

Apply a 2-D Wiener filter to the CCF dataset, segment by segment.

Operates only on continuous (non-NaN) segments of the time axis to avoid smearing across data gaps.

Parameters:
  • data – xarray Dataset containing a CCF variable (times × taxis).

  • M – Wiener filter window size along the time axis.

  • N – Wiener filter window size along the lag axis.

  • gap_threshold – Passed to find_segments().

Returns:

Copy of data with the filtered CCF variable.

msnoise.core.signal.winsorizing(data, params, input='timeseries', nfft=0)

Clip (Winsorise) a 2-D data array in the time or frequency domain.

Supports both one-shot sign-clipping (winsorizing == -1) and RMS-based clipping (winsorizing > 0). When input is "fft" the array is temporarily transformed back to the time domain, clipped, then re-transformed.

Parameters:
  • data – 1-D or 2-D array of shape (n_traces, n_samples).

  • params – MSNoise params object; must expose params.cc.winsorizing.

  • input"timeseries" (default) or "fft".

  • nfft – FFT length used when input is "fft"; ignored otherwise.

Returns:

Clipped array (same shape as input).

msnoise.core.signal.xwt(trace_ref, trace_current, fs, ns=3, nt=0.25, vpo=12, freqmin=0.1, freqmax=8.0, nptsfreq=100, wavelet_type=('Morlet', 6.0))

Wavelet Coherence Transform (WCT) between two time series (Grinsted et al.[5]; traveltime estimation: Mao et al.[1]).

Convenience wrapper around prepare_ref_wct() + apply_wct(). Use those two functions directly in hot loops (Mode A fixed-REF) to avoid recomputing the reference CWT on every call.

Parameters:
  • trace_ref – Reference signal (1-D array).

  • trace_current – Current signal (1-D array, same length).

  • fs – Sampling frequency in Hz.

  • ns – Scale-axis smoothing parameter.

  • nt – Time-axis smoothing parameter.

  • vpo – Voices-per-octave; higher = finer scale resolution.

  • freqmin – Lowest frequency of interest (Hz).

  • freqmax – Highest frequency of interest (Hz).

  • nptsfreq – Number of frequency points between freqmin and freqmax.

  • wavelet_type(name, param) tuple passed to get_wavelet_type().

Returns:

(WXamp, WXspec, WXangle, Wcoh, WXdt, freqs, coi)

Computation (CC / MWCS / WCT)

MSNoise core computation functions — cross-correlation, whitening, MWCS.

Moved from msnoise/move2obspy.py.

msnoise.core.compute.mwcs(current, reference, freqmin, freqmax, df, tmin, window_length, step, smoothing_half_win=5)

Moving Window Cross-Spectrum (MWCS) time-delay measurement.

Both time series are sliced into overlapping windows. Each window is mean-adjusted and cosine-tapered (85%) before being Fourier-transformed.

The cross-spectrum between the reference and current windows is:

\[X(\nu) = F_\text{ref}(\nu)\, F_\text{cur}^*(\nu)\]

where \({}^*\) denotes complex conjugation. \(X(\nu)\) is smoothed by convolution with a Hanning window. Cross-coherency is then:

\[C(\nu) = \frac{ \left| \overline{X(\nu)} \right| }{ \sqrt{ \overline{\left|F_\text{ref}(\nu)\right|^2}\; \overline{\left|F_\text{cur}(\nu)\right|^2} } }\]

where the over-bar denotes smoothing. The mean coherence per window is the mean of \(C(\nu)\) over the frequency band.

The unwrapped phase of \(X(\nu)\) is linearly proportional to frequency:

\[\phi_j = 2\pi\,\delta t\,\nu_j\]

so the time delay \(\delta t\) is the slope of a weighted linear regression over the frequency band (Clarke et al.[6], extending Poupinet et al.[7]). Weights incorporate both cross-spectral amplitude and coherence. The slope error is:

\[e_m = \sqrt{ \sum_j \left( \frac{w_j\,\nu_j}{\sum_i w_i\,\nu_i^2} \right)^2 \sigma_\phi^2 }, \qquad \sigma_\phi^2 = \frac{\sum_j (\phi_j - m\,\nu_j)^2}{N-1}\]

where \(w_j\) are the per-sample weights and \(\nu_j\) are the cross-coherences.

Returns one row per moving window: central lag time, \(\delta t\), error, and mean coherence.

Warning

The time series will not be filtered before computing the cross-spectrum! They should be band-pass filtered around the freqmin-freqmax band of interest beforehand.

Parameters:
  • current (numpy.ndarray) – The “Current” timeseries

  • reference (numpy.ndarray) – The “Reference” timeseries

  • freqmin (float) – The lower frequency bound to compute the dephasing (in Hz)

  • freqmax (float) – The higher frequency bound to compute the dephasing (in Hz)

  • df (float) – The sampling rate of the input timeseries (in Hz)

  • tmin (float) – The leftmost time lag (used to compute the “time lags array”)

  • window_length (float) – The moving window length (in seconds)

  • step (float) – The step to jump for the moving window (in seconds)

  • smoothing_half_win (int) – If different from 0, defines the half length of the smoothing hanning window.

Return type:

numpy.ndarray

Returns:

[time_axis,delta_t,delta_err,delta_mcoh]. time_axis contains the central times of the windows. The three other columns contain dt, error and mean coherence for each window.

msnoise.core.compute.myCorr2(data, maxlag, energy, index, plot=False, nfft=None, normalized=False)

Compute cross-correlations for all requested station pairs.

Tiled-batch implementation: processes pairs in chunks of _MYCORR2_CHUNK to amortise Python loop overhead while keeping peak memory bounded. Faster than the original per-pair loop for N > ~50 stations (~2-3x for N=200-500); falls back gracefully to near-loop speed for small N.

Parameters:
  • data (numpy.ndarray) – 2-D array (n_stations, nfft) containing the FFT of each pre-whitened time series.

  • maxlag (int) – Output half-length in samples; CCF returned over [-maxlag : maxlag] (length 2*maxlag + 1).

  • energy (numpy.ndarray) – Per-station RMS energy (n_stations,) used for POW normalisation.

  • index (list) – List of (ccf_id, sta1_idx, sta2_idx) tuples.

  • normalized (str or bool) – "POW", "MAX", "ABSMAX", or falsy for none.

Return type:

dict

Returns:

{ccf_id: ccf_array} for every pair in index.

msnoise.core.compute.pcc_xcorr(data, maxlag, energy, index, plot=False, nfft=None, normalized=False)

Phase Cross-Correlation v=2 (PCC2) — pure NumPy/SciPy implementation.

Replaces the former dependency on the unmaintained phasecorr package with a self-contained translation of the FFT-accelerated pcc2_set routine from FastPCC (Ventosa et al.[8]; Ventosa and Schimmel[9]).

Algorithm (matches FastPCC pcc2_set):

  1. Compute the amplitude-normalised analytic signal (phase signal) φ[n] = X_a[n] / |X_a[n]| for each trace — amplitude information is discarded entirely, so the result is insensitive to amplitude transients (earthquakes, glitches) without explicit temporal normalisation.

  2. Zero-pad to Nz = next_fast_len(N + maxlag) to avoid circular wrap-around (linear cross-correlation).

  3. Compute PCC2(lag) = IFFT(conj(FFT(φ1)) · FFT(φ2)) / (Nz · N) for every pair in index — O(N log N), same cost as GNCC.

Parameters:
  • data (numpy.ndarray) – 2-D time-domain array (n_stations, N); real-valued. Unlike myCorr2(), PCC2 requires the time-domain input because the Hilbert transform must be computed before any FFT.

  • maxlag (int or float) – Half-length of output CCF in samples.

  • energy – Unused (kept for API compatibility with myCorr2()).

  • index – List of (ccf_id, sta1_idx, sta2_idx) tuples.

  • normalized"MAX" or "ABSMAX" to normalise output; falsy for none. ("POW" is meaningless for PCC2 since amplitudes are discarded; it is silently ignored.)

Return type:

dict

Returns:

{ccf_id: ccf_array} of length 2*maxlag + 1 per pair.

References

msnoise.core.compute.smooth(x, window='boxcar', half_win=3)

Smooth a 1-D array with a symmetric window.

Pads the signal by reflection at both ends (length 2*half_win + 1) to reduce boundary effects, then convolves with a normalised window.

Parameters:
  • x (numpy.ndarray) – 1-D input array.

  • window (str) – "boxcar" (uniform, default) or "hanning".

  • half_win (int) – Half-width of the window; full width = 2*half_win+1.

Return type:

numpy.ndarray

Returns:

Smoothed array, same length as x.

msnoise.core.compute.whiten(data, Nfft, delta, freqmin, freqmax, plot=False, returntime=False)

Spectral whitening (1-bit / brutal mode) for a single real trace.

Computes the FFT of data, normalises the amplitude to unity in the passband [freqmin, freqmax], and returns the result. A 100-sample cosine taper transitions smoothly to zero outside the passband, ensuring no sharp spectral edges. This is the “brutal” whitening described by Bensen et al.[10] (their Section 2.2).

\[\tilde{X}[k] = \exp\!\left(i\,\arg X[k]\right) \qquad \text{for } \nu_k \in [f_\text{low},\, f_\text{high}]\]

Bins outside the passband are zeroed (with a cosine-tapered transition region); Hermitian symmetry is enforced so that the inverse FFT yields a real signal.

Parameters:
  • data (numpy.ndarray) – 1-D real time series.

  • Nfft (int) – FFT length (zero-padding if larger than len(data)).

  • delta (float) – Sampling interval in seconds (1 / sampling_rate).

  • freqmin (float) – Lower passband frequency (Hz).

  • freqmax (float) – Upper passband frequency (Hz).

  • plot (bool) – Show a diagnostic plot of the whitening stages (default False).

  • returntime (bool) – If True, return the whitened time-domain signal instead of the frequency-domain array.

Return type:

numpy.ndarray

Returns:

Whitened one-sided FFT array (complex) of length Nfft, or the corresponding real time-domain signal if returntime is True.

msnoise.core.compute.whiten2(fft, Nfft, low, high, porte1, porte2, psds, whiten_type)

Vectorised in-place spectral whitening for a batch of pre-computed FFTs.

Operates on the one-sided positive-frequency half of each FFT row and enforces Hermitian symmetry afterward. Three modes are available via whiten_type, corresponding to the normalisation strategies described by Bensen et al.[10]:

  • brutal (default) — one-bit normalisation: amplitude set to unity inside the passband with a cosine taper at both edges:

    \[\tilde{X}[k] = \exp\!\left(i\,\arg X[k]\right)\]
  • HANN — one-bit normalisation weighted by a Hann window across the passband, smoothly tapering the spectral amplitude:

    \[\tilde{X}[k] = \frac{X[k]}{|X[k]|}\cdot w_\text{Hann}[k]\]
  • PSD — divide by a pre-computed smoothed PSD, then clip outlier bins at the 5th–95th percentile to suppress spectral spikes:

    \[\tilde{X}[k] = \operatorname{clip}\!\left( \frac{X[k]}{S[k]},\,-A,\,A \right)\]

    where \(S[k]\) is the smoothed PSD and \(A\) is the RMS of the non-outlier bins.

Parameters:
  • fft (numpy.ndarray) – 2-D complex array (n_traces, Nfft) of pre-computed FFTs. Modified in-place.

  • Nfft (int) – FFT length (must match fft.shape[1]).

  • low (int) – Bin index where the left cosine taper begins (below passband).

  • high (int) – Bin index where the right cosine taper ends (above passband).

  • porte1 (int) – First bin of the flat passband.

  • porte2 (int) – Last bin of the flat passband.

  • psds (numpy.ndarray or None) – Smoothed PSD array (n_traces, Nfft//2+1); used only when whiten_type is "PSD".

  • whiten_type (str) – One of "brutal" (default), "HANN", or "PSD".

Returns:

None (modifies fft in-place).

msnoise.core.compute.compute_wct_dtt_batch(freqs, taxis, WXamp, Wcoh, WXdt, dtt_params, dist: float = 0.0)

Compute WCT dt/t and average coherence for one time-step.

Shared implementation used by both the fused path in s08 (where WXamp/Wcoh/WXdt are freshly computed in memory) and the standalone s09 path (where they are loaded from a WCT NetCDF file).

Parameters:
  • freqs – 1-D frequency array from the WCT (Hz).

  • taxis – 1-D lag-time axis array (s).

  • WXamp – 2-D cross-wavelet amplitude array (freqs, taxis).

  • Wcoh – 2-D wavelet coherence array (freqs, taxis).

  • WXdt – 2-D time-delay array (freqs, taxis).

  • dtt_params – Merged params object containing wavelet_dtt.* attributes.

  • dist – Interstation distance in km (for dynamic lag). Default 0.

Returns:

Tuple (dtt_row, err_row, coh_row, freqs_subset) where dtt_row and err_row are 1-D arrays over the DTT frequency subset and coh_row is the average coherence per frequency bin.

msnoise.core.compute.resolve_wct_lag_min(dtt_params, dist: float) float

Resolve the coda lag minimum for WCT dt/t computation.

Parameters:
  • dtt_params – Params object with wavelet_dtt attributes.

  • dist – Interstation distance in km (used for dynamic lag).

Returns:

Lag minimum in seconds.

msnoise.core.compute.build_wct_dtt_dataset(dates_list, dtt_rows, err_rows, coh_rows, freqs_subset)

Build a sorted xarray Dataset of WCT dt/t results.

Shared between the fused save path in s08 and the save path in s09.

Parameters:
  • dates_list – List of datetime64 timestamps.

  • dtt_rows – List of 1-D dt/t arrays (one per date).

  • err_rows – List of 1-D error arrays.

  • coh_rows – List of 1-D average-coherence arrays.

  • freqs_subset – 1-D frequency array for the DTT band.

Returns:

xarray.Dataset with variables DTT, ERR, COH and dims (times, frequency), sorted by times.

Preprocessing

Waveform Pre-processing

Pairs are first split and a station list is created. The database is then queried to get file paths. For each station, all files potentially containing data for the day are opened. The traces are then merged and split, to obtain the most continuous chunks possible. The different chunks are then demeaned, tapered and merged again to a 1-day long trace. If a chunk is not aligned on the sampling grid (that is, start at a integer times the sample spacing in s) , the chunk is phase-shifted in the frequency domain. This requires tapering and fft/ifft. If the gap between two chunks is small, compared to preprocess_max_gap, the gap is filled with interpolated values. Larger gaps will not be filled with interpolated values.

Each 1-day long trace is then high-passed (at preprocess_highpass Hz), then if needed, low-passed (at preprocess_lowpass Hz) and decimated/downsampled. Decimation/Downsampling are configurable (resampling_method) and users are advised testing Decimate. One advantage of Downsampling over Decimation is that it is able to downsample the data by any factor, not only integer factors. Downsampling is achieved with the ObsPy Lanczos resampler which we tested against the old scikits.samplerate.

If configured, each 1-day long trace is corrected for its instrument response. Currently, only dataless seed and inventory XML are supported.

../.static/preprocessing.png

As from MSNoise 1.5, the preprocessing routine is separated from the compute_cc and can be used by plugins with their own parameters. The routine returns a Stream object containing all the traces for all the stations/components.

msnoise.core.preprocessing.apply_preprocessing_to_stream(stream, params, responses=None, logger=None)

Apply MSNoise signal-processing pipeline to a raw ObsPy Stream.

This is the core preprocessing logic extracted from preprocess() so it can be used for both local-archive and FDSN/EIDA data paths.

Operations applied (in order):

  1. Phase-shift alignment

  2. Gap detection and filling (up to preprocess_max_gap samples)

  3. Sampling-rate check and down-sampling (Resample / Decimate / Lanczos)

  4. Detrend, taper, highpass / lowpass filter

  5. Instrument response removal (if remove_response is set)

Parameters:
  • stream – Raw Stream for one station.

  • paramsMSNoiseParams for this lineage.

  • responses – ObsPy Inventory for instrument response removal, or None.

  • logger – Python logger (defaults to msnoise.preprocessing).

Returns:

Processed Stream, or an empty stream if the data could not be processed.

msnoise.core.preprocessing.preprocess(stations, comps, goal_day, params, responses=None, loglevel='INFO', logger='msnoise.compute_cc_norot_child')

Fetches data for each stations and each comps using the data_availability table in the database.

To correct for instrument responses, make sure to set remove_response to “Y” in the config and to provide the responses DataFrame.

Example:

>>> from msnoise.api import connect, get_params
>>> from msnoise.core.preprocessing import preprocess
>>> db = connect()
>>> params = get_params(db)
>>> responses = preload_instrument_responses(db)
>>> st = preprocess(db, ["YA.UV06","YA.UV10"], ["Z",], "2010-09-01", params, responses, loglevel="INFO")
>>> st
 2 Trace(s) in Stream:
YA.UV06.00.HHZ | 2010-09-01T00:00:00.000000Z - 2010-09-01T23:59:59.950000Z | 20.0 Hz, 1728000 samples
YA.UV10.00.HHZ | 2010-09-01T00:00:00.000000Z - 2010-09-01T23:59:59.950000Z | 20.0 Hz, 1728000 samples
Parameters:
  • db (sqlalchemy.orm.session.Session) – A Session object, as obtained by msnoise.api.connect().

  • stations (list of str) – a list of station names, in the format NET.STA.LOC

  • comps (list of str) – a list of component names, in Z,N,E,1,2.

  • goal_day (str) – the day of data to load, ISO 8601 format: e.g. 2016-12-31.

  • params (class) – an object containing the config parameters, as obtained by msnoise.api.get_params().

  • responses (pandas.DataFrame) – a DataFrame containing the instrument responses, as obtained by msnoise.api.preload_instrument_responses().

Return type:

obspy.core.stream.Stream

Returns:

A Stream object containing all traces.

Stations & Data Sources

MSNoise station and data-availability management.

msnoise.core.stations.ARCHIVE_STRUCTURES = {'BUD': 'NET/STA/STA.NET.LOC.CHAN.YEAR.DAY', 'IDDS': 'YEAR/NET/STA/CHAN.TYPE/DAY/NET.STA.LOC.CHAN.TYPE.YEAR.DAY.HOUR', 'PDF': 'YEAR/STA/CHAN.TYPE/NET.STA.LOC.CHAN.TYPE.YEAR.DAY', 'SDS': 'YEAR/NET/STA/CHAN.TYPE/NET.STA.LOC.CHAN.TYPE.YEAR.DAY'}

Archive path templates keyed by structure name. Used by s01_scan_archive() and populate_stations().

msnoise.core.stations.add_data_source(session, name, uri='', data_structure='SDS', auth_env='MSNOISE')

Add a new DataSource to the database.

Parameters:
  • session – SQLAlchemy session.

  • name – Human label e.g. "local", "IRIS", "EIDA".

  • uri – Data location URI. Schemes: - bare path or sds:///path → local SDS archive - fdsn://http://... → FDSN web service - eida://http://... → EIDA routing client

  • data_structure – SDS sub-path format for local sources (default "SDS"). Ignored for FDSN/EIDA.

  • auth_env – Environment variable prefix for credentials (default "MSNOISE"). Worker looks up {auth_env}_FDSN_USER, {auth_env}_FDSN_PASSWORD, {auth_env}_FDSN_TOKEN.

Returns:

The created DataSource object.

msnoise.core.stations.check_stations_uniqueness(session, station)
Parameters:
  • session

  • station

Returns:

msnoise.core.stations.get_data_availability(session, net=None, sta=None, loc=None, chan=None, starttime=None, endtime=None)

Returns the DataAvailability objects for specific net, sta, starttime or endtime

Parameters:
Return type:

list

Returns:

list of DataAvailability

msnoise.core.stations.get_data_source(session, id=None, name=None)

Retrieve a DataSource by id or name.

Parameters:
  • session – SQLAlchemy session.

  • id – Primary key of the DataSource.

  • name – Human label of the DataSource.

Returns:

DataSource or None.

Raises:

ValueError – If neither id nor name is provided.

msnoise.core.stations.get_default_data_source(session)

Return the project default DataSource.

The default is the DataSource with ref=1 (created by the installer).

Parameters:

session – SQLAlchemy session.

Returns:

DataSource.

Raises:

RuntimeError – If no DataSource exists (installer not run).

msnoise.core.stations.distance_from_coords(x1, y1, x2, y2, coordinates='DEG')

Compute interstation distance in km from raw coordinate values.

Pure function — no ORM objects required. Used internally by get_interstation_distance() and by MSNoiseResult.get_distance() in DB-free archive mode.

Parameters:
  • x1 – Longitude (DEG) or Easting (UTM) of station 1.

  • y1 – Latitude (DEG) or Northing (UTM) of station 1.

  • x2 – Longitude (DEG) or Easting (UTM) of station 2.

  • y2 – Latitude (DEG) or Northing (UTM) of station 2.

  • coordinates"DEG" for WGS-84 lat/lon, "UTM" for metres.

Returns:

Distance in kilometres.

Return type:

float

msnoise.core.stations.get_interstation_distance(station1, station2, coordinates='DEG')

Returns the distance in km between station1 and station2.

Warning

Currently the stations coordinates system have to be the same!

Parameters:
  • station1 (Station) – A Station object

  • station2 (Station) – A Station object

  • coordinates (str) – The coordinates system. “DEG” is WGS84 latitude/ longitude in degrees. “UTM” is expressed in meters.

Return type:

float

Returns:

The interstation distance in km

msnoise.core.stations.get_new_files(session)

Returns the files marked “N”ew or “M”odified in the database

Parameters:

session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

Return type:

list

Returns:

list of DataAvailability

msnoise.core.stations.get_station(session, net, sta)

Get one Station from the database.

Parameters:
Return type:

msnoise.msnoise_table_def.declare_tables.Station

Returns:

a Station Object

msnoise.core.stations.get_station_pairs(session, used=None, net=None, include_single_station=False)

Returns an iterator over all possible station pairs. If auto-correlation is configured in the database, returns N*N pairs, otherwise returns N*(N-1)/2 pairs.

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • used (bool, int) – Select only stations marked used if False (default) or all stations present in the database if True

  • net (str) – Network code to filter for the pairs.

Return type:

iterable

Returns:

An iterable of Station object pairs

msnoise.core.stations.get_stations(session, all=False, net=None, format='raw')

Get Stations from the database.

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • all (bool) – Returns all stations from the database if True, or only stations where used = 1 if False (default)

  • net (str) – if set, limits the stations returned to this network

Return type:

list of msnoise.msnoise_table_def.declare_tables.Station

Returns:

list of Station

msnoise.core.stations.get_waveform_path(session, da)

Reconstruct the full absolute path for a DataAvailability record.

Joins DataSource.uri with da.path and da.file. If the DA record has no data_source_id (legacy or NULL), falls back to treating da.path as an absolute path (backward-compatible behaviour).

Parameters:
Returns:

Absolute path string to the waveform file.

msnoise.core.stations.import_stationxml(session, path_or_url, data_source_id=None, save_to_response_path=True)

Import stations from a StationXML file or URL.

Parses the inventory with ObsPy and creates or updates Station rows. Also populates used_location_codes and used_channel_names from the channel-level inventory (requires level=channel in FDSN queries). Empty location codes are stored as "--" (MSNoise convention).

When save_to_response_path is True (default), the parsed inventory is written as a StationXML file into the project’s response_path directory (read from global config). This makes the instrument responses immediately available to the preprocessing step without any manual file copying. The file is named <NET>.<STA>.xml for single-network inventories, or inventory_<timestamp>.xml when the inventory spans multiple networks. Saving failures are logged as warnings but never abort the import.

Parameters:
  • session – SQLAlchemy session.

  • path_or_url – Path to a local StationXML file, or a URL (e.g. an FDSN station web service query URL with level=channel).

  • data_source_id – DataSource to assign to imported stations. None → project default (DataSource.ref=1).

  • save_to_response_path – Write the inventory to response_path after a successful import (default True).

Returns:

Tuple (created, updated, saved_path) where saved_path is the absolute path of the written file, or None if not saved.

msnoise.core.stations.list_data_sources(session)

Return all DataSource rows.

Parameters:

session – SQLAlchemy session.

Returns:

List of DataSource.

msnoise.core.stations.mark_data_availability(session, net, sta, flag)

Updates the flag of all DataAvailability objects matching net.sta in the database

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • net (str) – Network code

  • sta (str) – Station code

  • flag (str) – Status of the DataAvailability object: New, Modified or Archive. Values accepted are {‘N’, ‘M’, ‘A’}

msnoise.core.stations.read_waveforms_from_availability(session, da_records, t_start, t_end, logger=None)

Read waveforms from a list of DataAvailability records into a Stream.

Builds the full path for each record via get_waveform_path() (honouring the DataSource root), deduplicates paths, and reads each file sliced to [t_start, t_end] using ObsPy. Files that cannot be read are silently skipped with a debug log.

This is the canonical “DA records → waveform Stream” helper that any worker (PSD, future steps) should use instead of hand-rolling os.path.join(f.path, f.file).

Parameters:
  • session – SQLAlchemy session.

  • da_records – Iterable of DataAvailability ORM objects.

  • t_startUTCDateTime start.

  • t_endUTCDateTime end.

  • logger – Optional logging.Logger; uses module logger if None.

Returns:

Stream (may be empty).

msnoise.core.stations.resolve_data_source(session, station)

Return the effective DataSource for a station.

If the station has data_source_id set, that DataSource is returned. Otherwise the project default (ref=1) is used.

Parameters:
  • session – SQLAlchemy session.

  • stationStation ORM object.

Returns:

DataSource.

msnoise.core.stations.set_all_stations_source(session, data_source_id)

Assign a DataSource to every station in the database.

Parameters:
  • session – SQLAlchemy session.

  • data_source_id – Primary key of the DataSource. None reverts all to default.

msnoise.core.stations.set_network_source(session, net, data_source_id)

Assign a DataSource to all stations of a network.

Parameters:
  • session – SQLAlchemy session.

  • net – Network code.

  • data_source_id – Primary key of the DataSource. None reverts to default.

msnoise.core.stations.set_station_source(session, net, sta, data_source_id)

Assign a specific DataSource to a station.

Parameters:
  • session – SQLAlchemy session.

  • net – Network code.

  • sta – Station code.

  • data_source_id – Primary key of the DataSource to assign. Pass None to revert to the project default.

Raises:

ValueError – If the station or DataSource does not exist.

msnoise.core.stations.to_sds(stats, year, jday)

Build an SDS-format relative file path from ObsPy trace stats.

Returns a path string of the form:

YYYY/NET/STA/CHAN.D/NET.STA.LOC.CHAN.D.YYYY.DDD
Parameters:
  • statsobspy.core.trace.Stats object.

  • year – 4-digit year integer.

  • jday – Julian day-of-year integer (1-366).

Returns:

Relative SDS path string.

msnoise.core.stations.update_data_availability(session, net, sta, loc, chan, path, file, starttime, endtime, data_duration, gaps_duration, samplerate, data_source_id=None)

Updates a DataAvailability object in the database

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • net (str) – The network code of the Station

  • sta (str) – The station code

  • chan (str) – The component (channel)

  • path (str) – Path to the folder containing the file, relative to DataSource.uri.

  • file (str) – The name of the file

  • starttime (datetime.datetime) – Start time of the file

  • endtime (datetime.datetime) – End time of the file

  • data_duration (float) – Cumulative duration of available data in the file

  • gaps_duration (float) – Cumulative duration of gaps in the file

  • samplerate (float) – Sample rate of the data in the file (in Hz)

  • data_source_id (int or None) – FK to DataSource. None → project default.

msnoise.core.stations.update_data_source(session, id, **kwargs)

Update fields on an existing DataSource.

Parameters:
  • session – SQLAlchemy session.

  • id – Primary key of the DataSource to update.

  • kwargs – Fields to update: name, uri, data_structure, auth_env.

Returns:

Updated DataSource.

Raises:

ValueError – If the DataSource does not exist.

msnoise.core.stations.update_station(session, net, sta, X, Y, altitude, coordinates='UTM', instrument='N/A', used=1)

Updates or Insert a new Station in the database.

See also

msnoise.msnoise_table_def.declare_tables.Station

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • net (str) – The network code of the Station

  • sta (str) – The station code

  • X (float) – The X coordinate of the station (Easting or Longitude)

  • Y (float) – The Y coordinate of the station (Northing or Latitude)

  • altitude (float) – The altitude of the station

  • coordinates (str) – The coordinates system. “DEG” is WGS84 latitude/ longitude in degrees. “UTM” is expressed in meters.

  • instrument (str) – The instrument code, useful with PAZ correction

  • used (bool) – Whether this station must be used in the computations.

msnoise.core.stations.populate_stations(db, loglevel='INFO')

Scan the default DataSource archive and populate the Station table.

Walks the archive directory according to the configured data_structure (SDS, BUD, IDDS, PDF, or a custom format). Station coordinates are initialised to zero — use import_stationxml() afterwards to set real coordinates.

For custom data structures, a custom.py file in the current working directory must export a populate(data_folder) function that returns a {NET_STA: [net, sta, lon, lat, alt, coordinates]} dictionary.

Parameters:
  • db – SQLAlchemy session.

  • loglevel – Logging verbosity (default "INFO").

Returns:

True on success.

FDSN / EIDA Data Access

FDSN/EIDA waveform fetching and bulk SDS download for MSNoise.

This module is the only place in MSNoise that makes network calls to external data services. It provides two distinct modes of data access:

On-the-fly fetch (used by the preprocess step)

fetch_and_preprocess() / fetch_raw_waveforms() — called per station per day during the preprocess worker loop. Streams waveforms directly from FDSN into the processing pipeline without writing a local archive.

Bulk download (mass_download())

Wraps ObsPy’s MassDownloader to fetch all waveforms for the full project date range in one call, routing them into a day-aligned local SDS archive. Station gating is done server-side via an ObsPy Inventory built from the MSNoise station table — avoiding any NSLC cartesian blowup. After bulk download, run msnoise scan_archive to populate data availability before starting the pipeline.

Typical bulk-download sequence:

msnoise db init --from-yaml project.yaml
msnoise utils import-stationxml https://...   # populates locs/chans
msnoise utils download                        # calls mass_download()
msnoise scan_archive --init
msnoise utils run_workflow

SDS root resolution (in order):

  1. Explicit --sds-path CLI option.

  2. The single unambiguous local SDS DataSource URI in the database.

  3. ./SDS (with a warning).

StationXML files are written to <sds_root>/../stationxml/ and never

msnoise.core.fdsn.build_client(ds)

Build an ObsPy client for the given DataSource.

Parameters:

dsDataSource ORM object.

Returns:

An ObsPy client with a get_waveforms_bulk method.

msnoise.core.fdsn.fetch_and_preprocess(db, jobs, goal_day, params, responses=None, loglevel='INFO', client=None)

Fetch waveforms from FDSN/EIDA for a batch of stations on one day.

Groups jobs by DataSource (all jobs in a batch share the same data_source_id when group_by="day_lineage_datasource" is used), issues one get_waveforms_bulk call, splits the result by station, and optionally writes raw files.

Parameters:
  • db – SQLAlchemy session.

  • jobs – List of Job ORM objects (all same day, same DataSource).

  • goal_day – Date string YYYY-MM-DD.

  • paramsMSNoiseParams for this lineage.

  • responses – ObsPy Inventory for instrument response removal, or None.

  • loglevel – Logging level string.

Returns:

Tuple (stream, done_jobs, failed_jobs) where stream contains preprocessed traces for all successfully fetched stations.

msnoise.core.fdsn.fetch_raw_waveforms(db, jobs, goal_day, params, t_start=None, t_end=None, client=None)

Fetch raw (unprocessed) waveforms from FDSN/EIDA for a batch of stations.

Issues one get_waveforms_bulk call covering all stations in jobs, optionally writes a raw file cache (fdsn_keep_raw=Y), and returns the combined Stream. No preprocessing is applied — the caller receives exactly what the FDSN server returns.

Use this when downstream processing needs raw data (e.g. PSD computation via ObsPy PPSD, which handles its own response correction internally). For pre-processing + response removal use fetch_and_preprocess().

Parameters:
  • db – SQLAlchemy session.

  • jobs – List of Job ORM objects (all same DataSource).

  • goal_day – Date string YYYY-MM-DD.

  • paramsMSNoiseParams for this lineage.

  • t_start – Optional UTCDateTime override for the fetch window start (default: midnight of goal_day).

  • t_end – Optional UTCDateTime override for the fetch window end (default: midnight + 86400 s).

Returns:

Stream (may be empty on failure).

msnoise.core.fdsn.fetch_waveforms_bulk(client, bulk_request, retries=3)

Issue a bulk waveform request with retry logic.

Parameters:
  • client – ObsPy FDSN/EIDA client.

  • bulk_request – List of (net, sta, loc, chan, t1, t2) tuples.

  • retries – Number of retry attempts for transient errors.

Returns:

Stream (may be empty on failure).

Raises:

Re-raises auth errors and no-data exceptions immediately.

msnoise.core.fdsn.is_remote_source(uri: str) bool

Return True if the DataSource URI points to a remote service.

msnoise.core.fdsn.parse_datasource_scheme(uri: str) str

Return the scheme of a DataSource URI.

Parameters:

uri – DataSource.uri string.

Returns:

One of "local", "sds", "fdsn", "eida".

msnoise.core.fdsn.get_auth(auth_env: str) dict

Read credentials from environment variables for the given prefix.

Looks up: - {auth_env}_FDSN_USER - {auth_env}_FDSN_PASSWORD - {auth_env}_FDSN_TOKEN (path to EIDA token file, or token string)

Parameters:

auth_env – Env var prefix (e.g. "MSNOISE", "IRIS").

Returns:

Dict with keys user, password, token (any may be None).

msnoise.core.fdsn.mass_download(session, schema, sds_root=None, startdate_override=None, enddate_override=None)

Download waveforms for all remote DataSources into an SDS archive.

Builds an ObsPy Inventory from the station table and passes it as limit_stations_to_inventory — the MassDownloader gates at station level server-side, avoiding any NSLC cartesian blowup. Channel codes are the union of all Station.chans().

chunklength_in_sec=86400 produces day-aligned SDS files. StationXML is stored under <sds_root>/../stationxml/ and never overwritten. sanitize=False prevents discarding traces for missing response.

Parameters:
  • session – SQLAlchemy session.

  • schema – Declared schema object (from declare_tables()).

  • sds_root – SDS write root as Path, or None to auto-resolve (see _resolve_sds_root()).

  • startdate_override – Override project startdate (YYYY-MM-DD).

  • enddate_override – Override project enddate (YYYY-MM-DD).

Raises:

ValueError – If no remote DataSource or no valid stations found.

exception msnoise.core.fdsn.FDSNConnectionError

Raised when an FDSN request fails due to a connection-level error.

Distinct from FDSNNoDataException (no matching data) and auth errors (wrong credentials). Callers that cache client objects should catch this, invalidate the cache, rebuild the client, and retry.

Project Archive I/O

Low-level I/O helpers for MSNoise project archives.

A project archive is a .tar.zst file containing a full MSNoise project at a given pipeline level (preprocess / cc / stack / dvv …). It is distinct from an MSNoiseResult result bundle (single lineage, params.yaml + _output/).

This module handles filesystem operations (extract, export, checksum, job reconstruction). Higher-level logic lives in msnoise.project.

Archive format

Every .tar.zst archive has this internal layout:

meta.yaml              ← {entry_level, msnoise_version, created_at, project_name}
MANIFEST.json          ← {relative_path: {sha256, size_bytes}} for every file
project.yaml           ← full MSNoise config (importable)
<lineage>/<step>/_output/...   ← pipeline outputs
<lineage>/<step>/params.yaml   ← per-lineage params (enables DB-free access)

Paths inside the archive are relative to the project root, so extraction into any directory produces a valid project tree.

bundle_pointer.yaml format

msnoise_version_min: "2.0.1"
created_at: "2026-04-01"
levels:
  stack:
    description: "Stacked CCFs + refstacks for all filter sets"
    url: "https://ftp.seismology.be/msnoise/study/level_stack.tar.zst"
    sha256: "a1b2c3d4…"
    size_gb: 6.8
  dvv:
    description: "Final DVV aggregates and per-pair series"
    url: "https://zenodo.org/record/XXXXXXX/files/level_dvv.tar.zst"
    sha256: "d4e5f6…"
    size_gb: 0.3

url must be a plain HTTPS URL; sha256 is the hash of the .tar.zst file itself (printed by export_project() on completion).

Typical workflow

Export:

from msnoise.core.project_io import export_project
sha = export_project("/path/to/project", "stack", "/data/level_stack.tar.zst")
# prints sha256 — paste into bundle_pointer.yaml

Import (Python):

from msnoise.core.project_io import import_project_archive
root = import_project_archive("bundle_pointer.yaml", "stack", "./my_project")

Import (CLI):

msnoise project import --from bundle_pointer.yaml --level stack \
    --project-dir ./my_project --with-jobs
msnoise.core.project_io.extract_archive(src: str | Path, dest: str | Path) Path

Extract a .tar.zst project archive into dest.

The archive may contain files at the root or inside a single top-level directory — both layouts are handled transparently. The extracted tree is always rooted at dest (no extra nesting introduced).

Parameters:
  • src – Path to the .tar.zst file.

  • dest – Destination directory (created if absent).

Returns:

Absolute Path to the extracted project root (i.e. the directory that contains project.yaml).

Raises:
  • FileNotFoundError – if src does not exist.

  • ValueError – if src is not a recognised archive format.

msnoise.core.project_io.file_sha256(path: str | Path) str

Return the hex SHA-256 digest of a file (streaming, memory-efficient).

msnoise.core.project_io.scan_lineages(project_dir: str | Path, category: str) list[tuple[list[str], Path]]

Locate all computed lineages for category under project_dir.

Scans the directory tree for folders named <category>_N that contain an _output subdirectory. The lineage names are reconstructed from the relative path segments between project_dir and the matched folder.

Parameters:
  • project_dir – MSNoise project root (contains project.yaml).

  • category – Category name without set number, e.g. "stack".

Returns:

List of (lineage_names, step_dir) tuples, one per match. lineage_names is ordered from root to leaf (e.g. ["global_1", "preprocess_1", "cc_1", "filter_1", "stack_1"]). step_dir is the absolute Path to the matched <category>_N folder.

msnoise.core.project_io.build_params_from_project_yaml(project_yaml_path: str | Path, lineage_names: list[str])

Build an MSNoiseParams from a project.yaml.

Reads project_yaml_path and extracts the config layers for each step in lineage_names. The returned object has the same structure as one produced by from_yaml().

Parameters:
  • project_yaml_path – Path to project.yaml.

  • lineage_names – Ordered list of step names, e.g. ["global_1", "preprocess_1", "stack_1"].

Returns:

MSNoiseParams with one layer per category in lineage_names.

msnoise.core.project_io.LEVEL_GLOBS: dict[str, list[str]] = {'cc': ['preprocess_*/cc_*/_output/**'], 'dvv': ['**/mwcs_dtt_dvv_[0-9]*/_output/**', '**/stretching_dvv_[0-9]*/_output/**', '**/wavelet_dtt_dvv_[0-9]*/_output/**'], 'mwcs': ['**/mwcs_[0-9]*/_output/**', '**/mwcs_dtt_[0-9]*/_output/**'], 'preprocess': ['preprocess_*/_output/**'], 'stack': ['**/filter_*/stack_*/_output/**', '**/filter_*/refstack_*/_output/**'], 'stretching': ['**/stretching_[0-9]*/_output/**'], 'wavelet': ['**/wavelet_[0-9]*/_output/**', '**/wavelet_dtt_[0-9]*/_output/**']}

Glob patterns (relative to project root) for each entry level. Each pattern addresses the _output/ directory of the relevant steps.

msnoise.core.project_io.LEVEL_CATEGORIES: dict[str, list[str]] = {'cc': ['cc'], 'dvv': ['mwcs_dtt_dvv', 'stretching_dvv', 'wavelet_dtt_dvv'], 'mwcs': ['mwcs', 'mwcs_dtt'], 'preprocess': ['preprocess'], 'stack': ['stack', 'refstack'], 'stretching': ['stretching'], 'wavelet': ['wavelet', 'wavelet_dtt']}

Categories whose outputs are present in each entry level. Used by reconstruct_jobs_from_filesystem() to know which steps to scan when inserting flag=D jobs.

msnoise.core.project_io.export_project(project_dir: str | Path, level: str, output_path: str | Path) str

Export a project archive (.tar.zst) for the given entry level.

Collects all _output/ trees matching level (see LEVEL_GLOBS), generates a params.yaml alongside each matched step directory, writes meta.yaml and MANIFEST.json, and streams everything into a .tar.zst file.

Parameters:
  • project_dir – MSNoise project root (contains project.yaml).

  • level – Entry level — one of the keys in LEVEL_GLOBS.

  • output_path – Destination .tar.zst file path (created/overwritten).

Returns:

Hex SHA-256 of the written archive (paste into bundle_pointer.yaml).

Raises:
  • ValueError – if level is not a recognised entry level.

  • FileNotFoundError – if project.yaml is absent from project_dir.

msnoise.core.project_io.reconstruct_jobs_from_filesystem(session, schema, level: str, root: str | Path) int

Insert flag=D jobs by scanning the extracted _output/ tree.

After msnoise db init --from-yaml the jobs table is empty. This function synthetically populates it so that normal new_jobs propagation generates the correct downstream flag=T jobs.

Parameters:
  • session – SQLAlchemy session (DB must already be initialised).

  • schema – Return value of declare_tables().

  • level – Entry level string (key of LEVEL_CATEGORIES).

  • root – Project root directory.

Returns:

Total number of flag=D jobs inserted.

Raises:

ValueError – if level is unknown.

msnoise.core.project_io.export_project_levels(project_dir: str | Path, levels: str | list[str], output_dir: str | Path, url_base: str = '') dict

Export one .tar.zst archive per entry level and write bundle_pointer.yaml.

Levels with no matching _output/ content are skipped silently.

Parameters:
  • project_dir – MSNoise project root.

  • levels – Level(s) to export: a single level name, a list, or "all" to attempt every level in LEVEL_GLOBS.

  • output_dir – Directory where archives and bundle_pointer.yaml are written (created if absent).

  • url_base – Optional URL prefix for the url fields in bundle_pointer.yaml. If supplied, URLs are formed as <url_base>/<filename>. Placeholders are written when omitted.

Returns:

Dict mapping level name → {"path": Path, "sha256": str} for every exported archive.

Raises:

ValueError – if levels contains an unrecognised level name.

msnoise.core.project_io.import_project_archive(pointer_path: str | Path, level: str | list[str], project_dir: str | Path) Path

Download and extract one or more project archives from a bundle_pointer.yaml.

Parameters:
  • pointer_path – Path to bundle_pointer.yaml.

  • level – Entry level(s) to import. Pass a single string (e.g. "stack"), a list (["stack", "dvv"]), or "all" to download every level listed in the pointer.

  • project_dir – Destination directory (created if absent). All archives are extracted into the same directory — their _output/ trees never overlap.

Returns:

Absolute path to the extracted project root.

Raises:
  • KeyError – if a requested level is absent from bundle_pointer.yaml.

  • ValueError – if a downloaded archive fails the SHA-256 check.