Core Modules

The msnoise.core package contains all internal implementation modules. Plugin authors should import from msnoise.plugins for a stable API.

Database & Connection

MSNoise database connection and logging utilities.

msnoise.core.db.connect(inifile=None)

Establishes a connection to the database and returns a Session object.

Parameters:

inifile (string) – The path to the db.ini file to use. Defaults to os.cwd() + db.ini

Return type:

sqlalchemy.orm.session.Session

Returns:

Session object, needed for many of the other API methods.

msnoise.core.db.create_database_inifile(tech, hostname, database, username, password, prefix='')

Creates the db.ini file based on supplied parameters.

Parameters:
  • tech (int) – The database technology used: 1=sqlite 2=mysql

  • hostname (string) – The hostname of the server (if tech=2) or the name of the sqlite file if tech=1)

  • database (string) – The database name

  • username (string) – The user name

  • prefix (string) – The prefix to use for all tables

  • password (string) – The password of user

Returns:

None

msnoise.core.db.get_engine(inifile=None)

Returns the a SQLAlchemy Engine

Parameters:

inifile (str) – The path to the db.ini file to use. Defaults to os.cwd() + db.ini

Return type:

sqlalchemy.engine.Engine

Returns:

An Engine Object

msnoise.core.db.get_logger(name, loglevel=None, with_pid=False)

Returns the current configured logger or configure a new one.

msnoise.core.db.read_db_inifile(inifile=None)

Reads the parameters from the db.ini file.

Parameters:

inifile (string) – The path to the db.ini file to use. Defaults to os.cwd() + db.ini

Return type:

collections.namedtuple

Returns:

tech, hostname, database, username, password

Configuration & Params

MSNoise configuration management, parameter merging, and plot filename helpers.

msnoise.core.config.build_plot_outfile(outfile, plot_name, lineage_names, *, pair=None, components=None, mov_stack=None, extra=None)

Resolve the output filename for a plot.

When outfile starts with "?", replaces "?" with a standardised auto-generated tag and prepends plot_name, producing a filename of the form:

<plot_name>__<lineage>[__<pair>][__<comp>][__m<ms>][__<extra>].<ext>

If outfile does not start with "?", it is returned unchanged (the caller supplied an explicit path).

If outfile is None or empty, returns None.

Parameters:
  • outfile – Raw outfile argument passed to the plot function (e.g. "?.png").

  • plot_name – Short plot identifier, e.g. "ccftime" or "dvv_mwcs". Must not contain __.

  • lineage_names – List of step name strings (e.g. ["preprocess_1", "cc_1", "filter_1", "stack_1"]) from MSNoiseResult.lineage_names.

  • pair – Optional station pair string ("NET.STA.LOC-NET.STA.LOC"). Dots are kept; colons are replaced with -.

  • components – Optional component string, e.g. "ZZ".

  • mov_stack – Optional moving-stack tuple or string, e.g. ("1D", "1D")"m1D-1D".

  • extra – Optional extra qualifier string appended last.

Returns:

Resolved filename string, or outfile unchanged.

msnoise.core.config.create_config_set(session, set_name)

Create a configuration set for a workflow step.

Parameters:
Return type:

int or None

Returns:

The set_number if set was created successfully, None otherwise

msnoise.core.config.delete_config_set(session, set_name, set_number)

Delete a configuration set for a workflow step.

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • set_name (str) – The name of the workflow step (e.g., ‘mwcs’, ‘mwcs_dtt’, etc.)

  • set_number (int) – The set number to delete

Return type:

bool

Returns:

True if set was deleted successfully, False otherwise

msnoise.core.config.get_config(session, name=None, isbool=False, plugin=None, category='global', set_number=None)

Get the value of one or all config bits from the database.

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • name (str) – The name of the config bit to get. If omitted, a dictionary with all config items will be returned

  • isbool (bool) – if True, returns True/False for config name. Defaults to False

  • plugin (str) – if provided, gives the name of the Plugin config to use. E.g. if “Amazing” is provided, MSNoise will try to load the “AmazingConfig” entry point. See Extending MSNoise with Plugins for details.

Return type:

str, bool or dict

Returns:

the value for name or a dict of all config values

msnoise.core.config.get_config_categories_definition()

Return ordered display metadata for all workflow categories.

Each entry is a (category_key, display_name, level) tuple where level is the DAG depth (global=0, preprocess/psd=1 — both branch off global, …).

Derived from get_category_display_info() so plugin-added categories appear automatically. The list is ordered for UI rendering (CC branch first, PSD branch at end).

msnoise.core.config.get_config_set_details(session, set_name, set_number, format='list')

Get details of a specific configuration set.

Parameters:
Return type:

list

Returns:

List of config entries in the set

msnoise.core.config.get_config_sets_organized(session)

Get configuration sets organized by category in the standard order

msnoise.core.config.get_merged_params_for_lineage(db, orig_params, step_params, lineage)

Build a MSNoiseParams for the given lineage.

Returns (lineage, lineage_names, MSNoiseParams).

msnoise.core.config.get_params(session)

Build a single-layer MSNoiseParams for global config.

Queries the Config table directly for all (category='global', set_number=1) rows and casts each value to its declared param_type. Does not depend on default.py.

For full pipeline params (per-step config included), use get_merged_params_for_lineage() instead.

Parameters:

session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

Return type:

MSNoiseParams

msnoise.core.config.lineage_to_plot_tag(lineage_names)

Convert a list of step names to a compact plot filename tag.

Each step is abbreviated using _STEP_ABBREVS and the set number is appended. Steps are joined with -.

Examples:

lineage_to_plot_tag(["preprocess_1","cc_1","filter_1","stack_1"])
# → "pre1-cc1-f1-stk1"

lineage_to_plot_tag(["preprocess_1","cc_1","filter_1","stack_1",
                      "refstack_1","mwcs_1","mwcs_dtt_1","mwcs_dtt_dvv_1"])
# → "pre1-cc1-f1-stk1-ref1-mwcs1-dtt1-dvv1"
Parameters:

lineage_names – List of step name strings from MSNoiseResult.lineage_names.

Returns:

Hyphen-joined abbreviated tag string.

msnoise.core.config.list_config_sets(session, set_name=None)

List all configuration sets, optionally filtered by category.

Parameters:
Return type:

list

Returns:

List of tuples (category, set_number, entry_count)

msnoise.core.config.parse_config_key(key)

Parse a dot-notation config key into (category, set_number, name).

Accepted forms:

output_folder            →  ("global", 1, "output_folder")   # global shorthand
cc.cc_sampling_rate      →  ("cc",     1, "cc_sampling_rate")
cc.2.cc_sampling_rate    →  ("cc",     2, "cc_sampling_rate")
mwcs.2.mwcs_wlen         →  ("mwcs",   2, "mwcs_wlen")
Raises:

ValueError – for malformed keys (too many parts, non-integer set number, or bare name that is not a global parameter).

msnoise.core.config.update_config(session, name, value, plugin=None, category='global', set_number=None)

Update one config bit in the database.

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • name (str) – The name of the config bit to set.

  • value (str) – The value of parameter name. Can also be NULL if you don’t want to use this particular parameter.

  • plugin (str) – if provided, gives the name of the Plugin config to use. E.g. if “Amazing” is provided, MSNoise will try to load the “AmazingConfig” entry point. See Extending MSNoise with Plugins for details.

  • category (str) – The config category (default ‘global’). Use e.g. ‘stack’, ‘filter’, ‘mwcs’ to target a specific config set.

  • set_number (int or None) – The config set number within the category (default None for global config). Use e.g. 1 for stack_1.

Workflow & Jobs

MSNoise workflow topology, job management, lineage resolution and scheduling.

msnoise.core.workflow.build_movstack_datelist(session)

Creates a date array for the analyse period. The returned tuple contains a start and an end date, and a list of individual dates between the two.

Parameters:

session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

Return type:

tuple

Returns:

(start, end, datelist)

msnoise.core.workflow.build_ref_datelist(params, session=None)

Creates a date array for the REF. The returned tuple contains a start and an end date, and a list of individual dates between the two.

Parameters:
  • params (AttribDict) – A params object as obtained by get_params().

  • session (sqlalchemy.orm.session.Session, optional) – A Session object, as obtained by connect(). Required only when ref_begin is "1970-01-01" (auto-detect from data availability).

Return type:

tuple

Returns:

(start, end, datelist)

msnoise.core.workflow.compute_rolling_ref(data, ref_begin, ref_end)

Compute a per-index rolling reference from CCF data.

For each time index i, the reference is:

mean(data[i + ref_begin : i + ref_end])

Both ref_begin and ref_end must be negative integers with ref_begin < ref_end <= -1. Use ref_end=-1 to exclude the current window (compare to the previous N windows).

Uses min_periods=1 semantics: the first few steps receive whatever partial window is available rather than NaN.

Return type:

numpy.ndarray Shape (n_times, n_lag_samples). Row i is the reference for time step i.

Create a link between two workflow steps

Create workflow links automatically between existing workflow steps, following the DAG defined by get_workflow_chains().

Linking rule: every step in a source category is linked to every step in each of its successor categories (ALL×ALL). Set numbers carry no topological meaning — all combinations are created and the user removes unwanted links via the admin UI. The DAG is determined entirely by WorkflowLink rows.

Returns:

tuple: (created_count, existing_count, error_message)

msnoise.core.workflow.create_workflow_step(session, step_name, category, set_number, description=None)

Create a new workflow step

msnoise.core.workflow.create_workflow_steps_from_config_sets(session)

Create workflow steps automatically from all existing config sets, sorted by natural workflow order.

Returns:

tuple: (created_count, existing_count, error_message)

msnoise.core.workflow.extend_days(days)

Return a DatetimeIndex from days extended by one extra day at the end.

Replaces the pandas 1.x pattern:

idx = pd.to_datetime(days)
idx = idx.append(pd.DatetimeIndex([idx[-1] + pd.Timedelta("1d")]))

which was removed in pandas 2.0.

Parameters:

days – sequence of date-like values (strings, dates, datetimes…)

Return type:

pandas.DatetimeIndex

msnoise.core.workflow.get_done_lineages_for_category(session, category)

Return all distinct computed lineages for a given workflow step category.

Queries Job rows whose associated WorkflowStep.category matches category and whose flag is 'D' (done), then de-duplicates and resolves each lineage string into an ordered list of step-name strings (upstream → downstream, including the step itself).

This is the correct way to enumerate output folders for a step that may be reached via multiple upstream paths (e.g. multiple filters, multiple MWCS configs), because it reflects what was actually computed rather than what the DAG topology suggests.

Example:

lineages = get_done_lineages_for_category(db, 'stretching')
# → [
#     ['preprocess_1', 'cc_1', 'filter_1', 'stack_1', 'stretching_1'],
#     ['preprocess_1', 'cc_1', 'filter_2', 'stack_1', 'stretching_1'],
# ]
Parameters:

category (str) – Workflow step category, e.g. 'stretching', 'mwcs_dtt', 'wavelet_dtt'.

Return type:

list[list[str]]

Returns:

Sorted list of unique lineage-name lists.

msnoise.core.workflow.get_filter_steps_for_cc_step(session, cc_step_id)

Get all filter steps that are children of a specific CC step.

Parameters:
  • session – Database session

  • cc_step_id – The step_id of the CC step

Returns:

List of filter workflow steps that are successors of the CC step

msnoise.core.workflow.get_job_types(session, jobtype)

Return job counts grouped by flag for a given jobtype string.

Works with the v2 workflow model where jobtype is a step name such as "cc_1". Returns a list of (count, flag) tuples.

Parameters:
Return type:

list of (int, str)

Returns:

List of (count, flag) pairs

msnoise.core.workflow.get_lineages_to_step_id(session, step_id, include_self=True, max_depth=50, max_paths=5000)

Enumerate upstream lineages (all distinct paths) ending at step_id.

Returns a list of paths, each path being a list[WorkflowStep] ordered upstream -> downstream. This preserves branch structure (unlike get_upstream_steps_for_step_id which de-duplicates nodes).

Safety:
  • max_depth prevents infinite loops in case of bad/cyclic graphs

  • max_paths prevents combinatorial explosion

Parameters:
sessionsqlalchemy.orm.session.Session
step_idint
include_selfbool

If True, each path ends with the step itself.

max_depthint
max_pathsint
Returns:
list[list[WorkflowStep]]
msnoise.core.workflow.get_next_job_for_step(session, step_category='preprocess', flag='T', group_by='day', limit_days=None, chunk_size=0)

Return a claimed batch of jobs for a workflow step category.

Note

Most callers should use get_next_lineage_batch() instead, which wraps this function and also resolves lineage strings, params, and station-pair metadata into a ready-to-use batch dict.

group_by:
  • “day”: claim all jobs for the selected (step_id, jobtype, day)

  • “pair”: claim all jobs for the selected (step_id, jobtype, pair)

  • “pair_lineage”: claim all jobs for the selected (step_id, jobtype, pair, lineage)

  • “day_lineage”: claim all jobs for the selected (step_id, jobtype, day, lineage)

chunk_size:

Only applies to group_by="day_lineage". When > 0, at most chunk_size jobs are claimed per batch instead of the full day. Useful for steps with O(N²) work per day (CC) or large per-day station counts (PSD) so that multiple workers can share the same day in parallel without write conflicts.

chunk_size=0 (default) claims everything — identical to the original behaviour.

msnoise.core.workflow.get_next_lineage_batch(db, step_category, group_by='pair_lineage', loglevel='INFO', day_value=None, chunk_size=0)

Standard worker prolog for lineage-aware steps.

  • Claims the next batch of jobs for step_category using get_next_job_for_step.

  • Extracts (pair, lineage_str, refs, days).

  • Loads current step config (from the job row).

  • Resolves lineage_str -> WorkflowStep objects.

  • Builds a MSNoiseParams for that lineage (one layer per category).

Parameters:
chunk_sizeint, optional

Passed to get_next_job_for_step(). Only effective for group_by="day_lineage". When > 0, at most chunk_size jobs are claimed per batch, enabling multiple workers to share the same day in parallel. Default 0 = claim everything (original behaviour).

Returns:
dict with keys:
  • jobs, step

  • pair, lineage_str, lineage_steps

  • lineage_names — full list including current step name

  • lineage_names_upstream — full minus current step (replaces manual [:-1])

  • lineage_names_mov — upstream with any refstack_* entries stripped

    (used by mwcs/wct/stretching to find MOV CCFs)

  • refs, days

  • step_params, params — params is a MSNoiseParams instance

or None if no jobs were claimed (caller should continue/sleep).
msnoise.core.workflow.get_refstack_lineage_for_filter(session, filterid, refstack_set_number=1)

Get the full lineage path from root through a filter step to its refstack.

Refstack is now a sibling of stack (both children of the filter pass-through). Returns the path ending at refstack_M directly downstream of filter_{filterid} — suitable for xr_get_ref().

Example for the default single-pipeline:

get_refstack_lineage_for_filter(db, 1)
# → ['preprocess_1', 'cc_1', 'filter_1', 'refstack_1']
Parameters:
  • filterid (int) – The filter set_number (e.g. 1 for filter_1).

  • refstack_set_number (int) – Which refstack set to use (default 1).

Return type:

list of str

msnoise.core.workflow.get_t_axis(params)

Returns the time axis (in seconds) of the CC functions.

Return type:

numpy.array

Returns:

the time axis in seconds

msnoise.core.workflow.get_workflow_job_counts(db)

Get job counts by status for the workflow

Parameters:

db – Database connection

Returns:

Dictionary with job counts by status

Get all links in a workflow

msnoise.core.workflow.get_workflow_steps(session)

Get all steps in a workflow

msnoise.core.workflow.is_next_job_for_step(session, step_category='preprocess', flag='T')

Are there any workflow-aware Jobs in the database with the specified flag and step category?

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • step_category (str) – Workflow step category (e.g., “preprocess”, “qc”)

  • flag (str) – Status of the Job: “T”odo, “I”n Progress, “D”one.

Return type:

bool

Returns:

True if at least one Job matches the criteria, False otherwise.

msnoise.core.workflow.lineage_str_to_step_names(lineage_str, sep='/')

Convert a lineage string like “preprocess_1/cc_1/filter_1” into a list of step_name strings, preserving order.

msnoise.core.workflow.lineage_str_to_steps(session, lineage_str, sep='/', strict=True)

Resolve a lineage string to a list of WorkflowStep ORM objects (ordered).

Parameters:
sessionsqlalchemy.orm.session.Session
lineage_strstr

e.g. “preprocess_1/cc_1/filter_1”

sepstr

Separator used in lineage strings.

strictbool

If True, raises if any step_name can’t be resolved. If False, silently skips missing steps.

Returns:
list[WorkflowStep]
msnoise.core.workflow.massive_insert_job(session, jobs)

Bulk-insert a list of job dicts into the jobs table.

Each dict must contain at minimum day, pair, jobtype, flag and lastmod keys. Optional keys: step_id, priority, lineage.

Parameters:
msnoise.core.workflow.massive_update_job(session, jobs, flag='D')

Routine to use a low level function to update much faster a list of Job. This method uses the Job.ref which is unique. :type session: Session :param session: the database connection object :type jobs: list or tuple :param jobs: a list of Job to update. :type flag: str :param flag: The destination flag.

msnoise.core.workflow.propagate_downstream(session, batch: dict) int

Propagate a just-completed batch to all immediate downstream worker steps.

Called immediately after massive_update_job() marks a batch Done, only when params.global_.hpc is False (the default). In HPC mode the operator runs msnoise new_jobs --after <category> manually.

Delegates to the specialised propagate_* functions from s02_new_jobs for transitions that require non-trivial logic (fan-outs, sentinel jobs, etc.). For simple pair-preserving transitions it performs a targeted bulk upsert keyed on the batch’s exact (pair, day) tuples.

Parameters:
sessionsqlalchemy.orm.Session
batchdict

Return value of get_next_lineage_batch().

Returns:
int

Total jobs created or bumped to T.

msnoise.core.workflow.refstack_is_rolling(params)

Return True if the refstack configset uses rolling-index mode.

Rolling mode is indicated by ref_begin being a negative integer string (e.g. "-5"). In this mode no REF file is written to disk; the reference is computed on-the-fly at MWCS/stretching/WCT time via compute_rolling_ref().

Parameters:

params (obspy.core.AttribDict) – Merged parameter set containing ref_begin.

Return type:

bool

msnoise.core.workflow.refstack_needs_recompute(session, pair, cc_lineage_prefix, params)

Return True if the REF stack needs to be recomputed for this pair.

Queries Done stack jobs for pair across all active stack_N steps whose lineage starts with cc_lineage_prefix (e.g. ["preprocess_1", "cc_1", "filter_1"]), and checks whether any date falls inside the configured [ref_begin, ref_end] window.

Stack and refstack are now siblings (both children of filter/cc), so the refstack worker no longer has a stack step in its upstream lineage. Instead, the cc/filter prefix is shared by both branches and is used here to locate the sibling stack jobs.

Should only be called for Mode A (fixed-date) refstack jobs.

Parameters:
  • session – SQLAlchemy session.

  • pair – Station pair string "NET.STA.LOC:NET.STA.LOC".

  • cc_lineage_prefix – Lineage name list up to (but not including) the stack/refstack level, e.g. ["preprocess_1", "cc_1", "filter_1"]. Pass batch["lineage_names_upstream"] from the refstack worker — that list ends at filter_N (the pass-through above refstack).

  • paramsMSNoiseParams for this lineage.

Return type:

bool

msnoise.core.workflow.reset_jobs(session, jobtype, alljobs=False, reset_i=True, reset_e=True, downstream=False)

Reset jobs back to ``”T”``odo.

jobtype can be:

  • A specific step name, e.g. "cc_1" — resets only that step.

  • A category name, e.g. "cc" — resets all active steps in that category (cc_1, cc_2, …).

If downstream is True, all steps reachable via WorkflowLink from the resolved step(s) are also reset (breadth-first, respects is_active).

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object

  • jobtype (str) – Step name or category name to reset.

  • alljobs (bool) – If True reset all jobs regardless of current flag; otherwise only resets "I" and/or "F" flagged jobs.

  • reset_i (bool) – Reset ``”I”``n-progress jobs (default True).

  • reset_e (bool) – Reset ``”E”``rror/failed jobs (default True).

  • downstream (bool) – Also reset every step reachable downstream via active WorkflowLink rows (default False).

msnoise.core.workflow.resolve_lineage_params(session, lineage_names)

Resolve a lineage name-list into a fully merged params object.

Given a list of step-name strings (as returned by get_done_lineages_for_category()), resolves them to WorkflowStep ORM objects and merges every step’s configuration into the global params, exactly as the processing steps themselves do via get_next_lineage_batch().

Returns (lineage_steps, lineage_names, params) — the same tuple as get_merged_params_for_lineage() — so callers can use params directly (it will have components_to_compute, mov_stack, etc.).

Example:

lineage_names = get_done_lineages_for_category(db, 'mwcs_dtt')[0]
_, _, params = resolve_lineage_params(db, lineage_names)
mov_stack = params.stack.mov_stack[0]
Parameters:

lineage_names – Ordered list of step-name strings, e.g. ['preprocess_1', 'cc_1', 'filter_1', 'stack_1', 'mwcs_1', 'mwcs_dtt_1'].

Return type:

tuple(list, list[str], MSNoiseParams)

msnoise.core.workflow.update_job(session, day, pair, jobtype, flag, step_id=None, priority=0, lineage=None, commit=True, returnjob=True, ref=None)

Updates or Inserts a Job in the database. Workflow-aware: handles step_id and lineage fields.

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • day (str) – The day in YYYY-MM-DD format

  • pair (str) – the name of the station pair

  • jobtype (str) – Job type string, e.g. "preprocess_1"

  • flag (str) – Status of the Job: “T”odo, “I”n Progress, “D”one, “F”ailed.

  • step_id (int or None) – WorkflowStep primary key, or None

  • priority (int) – Job priority (default 0)

  • lineage (str or None) – Lineage string encoding upstream configset chain

  • commit (bool) – Whether to directly commit (True, default) or not (False)

  • returnjob (bool) – Return the modified/inserted Job (True, default) or not (False)

  • ref (int or None) – If provided, look up the job by its primary key instead of (day, pair, jobtype, lineage).

Return type:

Job or None

Returns:

If returnjob is True, returns the modified/inserted Job.

msnoise.core.workflow.filter_within_daterange(date, start_date, end_date)

Check if a date falls within the configured range

msnoise.core.workflow.get_first_runnable_steps_per_branch(session, source_step_id, skip_categories=None)

Return the first runnable step on each outgoing branch from source_step_id, skipping steps in skip_categories (default: {“filter”}).

  • “Branch” roots are the immediate successors of source_step_id.

  • If skipped nodes fan out, each fan-out is treated as a separate branch.

  • Results are deduped (set behavior) and returned in stable order.

msnoise.core.workflow.get_step_successors(session, step_id)

Get all steps that this step feeds into

msnoise.core.workflow.get_workflow_graph(session)

Return workflow as nodes and edges for visualization, sorted by workflow order

msnoise.core.workflow.get_workflow_chains()

Return the full workflow adjacency map, including any plugin extensions.

Merges the built-in topology with addenda registered via the msnoise.plugins.workflow_chains entry point group. Each registered callable must return a dict of the same schema as the built-in chains:

{
    "my_step": {
        "next_steps": ["other_step"],
        "is_entry_point": False,
        "is_terminal": False,
    }
}

Plugin entries are merged in load order; later plugins can override earlier ones for the same category key.

Return type:

dict[str, dict]

msnoise.core.workflow.get_workflow_order()

Return the canonical category display order, including plugin extensions.

Merges the built-in order with any extra categories registered via the msnoise.plugins.workflow_order entry point group. Each registered callable must return a list of category name strings in the desired insertion order (appended after the built-in list).

Return type:

list[str]

msnoise.core.workflow.get_step_abbrevs()

Return a mapping of workflow category → short plot-filename abbreviation.

Merges the built-in abbreviations (from _BUILTIN_WORKFLOW_CHAINS) with any abbrev keys provided by plugin entry points via msnoise.plugins.workflow_chains. Falls back to the raw category name for any category without an abbrev entry.

Return type:

dict[str, str]

msnoise.core.workflow.get_category_display_info()

Return ordered display metadata for all workflow categories.

Each entry is a dict with keys category, display_name, level (DAG depth: global=0, preprocess/psd=1, cc=2, …). Ordered for UI rendering: the main CC branch first (preprocess→cc→…→dvv), then the PSD branch (psd→psd_rms). Plugin categories are appended at the end.

Return type:

list[dict]

msnoise.core.workflow.is_terminal_category(category)

Return True if category is a terminal workflow step (no successors).

Terminal categories are those with an empty next_steps list in get_workflow_chains(). Examples: psd_rms, mwcs_dtt_dvv.

Parameters:

category – Workflow category string.

Return type:

bool

msnoise.core.workflow.is_entry_category(category)

Return True if category is a DAG entry point (no predecessors).

Entry categories are those with is_entry_point: True in get_workflow_chains(). Currently only global.

Parameters:

category – Workflow category string.

Return type:

bool

msnoise.core.workflow.query_active_steps(session, categories)

Return all active WorkflowStep rows whose category is in categories.

Parameters:

categories – A single category string or an iterable of strings.

Return type:

list[WorkflowStep]

msnoise.core.workflow.resolve_lineage_ids(session, lids)

Batch-resolve a set of lineage_ids to their lineage strings.

Parameters:

lids – Iterable of integer lineage_ids (None values are skipped).

Returns:

{lineage_id: lineage_str} for every id that exists in the DB.

Return type:

dict[int, str]

msnoise.core.workflow.bulk_upsert_jobs(session, to_insert, to_bump_refs, now, *, bump_flag_filter=None)

Bulk-insert new jobs and bump existing ones back to "T".

Parameters:
  • to_insert – List of job dicts (keys: day, pair, jobtype, step_id, priority, flag, lastmod, lineage_id).

  • to_bump_refs – List of Job.ref primary-key values to reset to T.

  • nowdatetime used for lastmod on bumped rows.

  • bump_flag_filter – If given, only rows whose current flag equals this value are bumped. Pass None to bump unconditionally.

Returns:

(n_inserted, n_bumped)

Return type:

tuple[int, int]

msnoise.core.workflow.get_category_cli_command(category: str) list | None

Return the msnoise sub-command token list for category, or None.

Reads the "cli" key from get_workflow_chains() so that the mapping stays in the single source of truth. None means the category is a pass-through (no worker, no jobs).

Plugin packages extend the mapping by adding a "cli" key to the dict returned by their msnoise.plugins.workflow_chains entry point — no separate entry point is needed.

Parameters:

category – Workflow category string (e.g. "cc", "stack").

Return type:

list[str] or None

msnoise.core.workflow.run_workflow_plan(session, *, threads: int = 1, hpc: bool | None = None, from_category: str | None = None, until_category: str | None = None, chunk_size: int = 0) list

Build an ordered execution plan for all active workflow steps.

Performs a topological sort of active WorkflowStep categories via WorkflowLink rows and returns an ordered list of RunStep namedtuples. Pass-through categories (filter, global) are omitted. Categories with no runnable command (see get_category_cli_command()) are also omitted.

The returned plan is pure data — no subprocesses are started. The CLI command msnoise utils run_workflow drives the actual execution.

Parameters:
  • session – SQLAlchemy session.

  • threads – Number of parallel worker threads/processes (-t N).

  • hpc – Override hpc flag from config. If None, reads from params.global_.hpc.

  • from_category – Skip all categories before this one (inclusive start).

  • until_category – Stop after this category (inclusive end).

  • chunk_size--chunk-size value passed to cc compute and qc compute_psd.

Returns:

Ordered list of RunStep namedtuples.

Return type:

list[RunStep]

I/O

MSNoise xarray I/O for all result types (CCF, MWCS, DTT, STR, WCT, DVV, PSD).

msnoise.core.io.aggregate_dvv_pairs(root, parent_lineage, parent_step_name, parent_category: str, mov_stack, component: str, pair_type: str, pairs, params) Dataset

Aggregate per-pair DTT/STR/WCT-DTT results into network-level dv/v statistics.

Reads all per-pair output files for the given (mov_stack, component) combination, extracts the appropriate dv/v and error columns per pair type, applies quality filtering, then computes across-pair statistics at each time step.

Parameters:
  • root – Output folder root.

  • parent_lineage – Lineage name list up to and including the parent DTT step (e.g. ["preprocess_1", ..., "mwcs_dtt_1"]).

  • parent_step_name – Step name of the parent DTT step.

  • parent_category"mwcs_dtt", "stretching", or "wavelet_dtt".

  • mov_stack – Tuple (window, step) e.g. ("1D", "1D").

  • component – Component string e.g. "ZZ".

  • pair_type"CC", "SC", "AC", or "ALL".

  • pairs – Iterable of (sta1, sta2) SEED-id string tuples.

  • params – Params object from get_params().

Returns:

xarray.Dataset with dim times and stat variables.

Raises:

ValueError – if no data files are found for the given combination.

msnoise.core.io.psd_ppsd_to_dataframe(ppsd)
msnoise.core.io.psd_ppsd_to_dataset(ppsd)

Convert an ObsPy PPSD object to an xarray.Dataset.

Builds the same data as psd_ppsd_to_dataframe() but returns an xr.Dataset with a PSD variable of dims (times, periods) — ready to pass directly to xr_save_psd() without a DataFrame round-trip.

Parameters:

ppsdPPSD object.

Returns:

xarray.Dataset.

msnoise.core.io.psd_read_results(net, sta, loc, chan, datelist, format='PPSD', use_cache=True)
msnoise.core.io.save_daily_ccf(root, lineage, step_name, station1, station2, components, date, corr, taxis)

Save a daily-stacked CCF to NetCDF using xr_save_ccf_daily().

Replaces the legacy add_corr() / _export_mseed / _export_sac pipeline. Output lives at:

<root>/<lineage>/<step_name>/_output/daily/<components>/<sta1>_<sta2>/<date>.nc
Parameters:
  • root – Output folder (params.global_.output_folder).

  • lineage – List of step-name strings for the lineage path.

  • step_name – Current step name (e.g. "cc_1").

  • station1 – First station SEED id NET.STA.LOC.

  • station2 – Second station SEED id NET.STA.LOC.

  • components – Component pair string e.g. "ZZ".

  • date – Calendar date (datetime.date or ISO string).

  • corr – 1-D numpy array, the stacked CCF.

  • taxis – 1-D lag-time axis array.

msnoise.core.io.xr_get_ccf(root, lineage, station1, station2, components, mov_stack, taxis)

Load CCF results from a NetCDF file.

Returns:

xarray.DataArray with dims (times, taxis), lazily memory-mapped. Call .load() or .values when you need numpy.

Raises:

FileNotFoundError – if the NetCDF file does not exist.

Note

xr_get_ccf (readers: s05/s08/s10) and xr_save_ccf (writer: s04) operate on strictly disjoint workflow steps — the file is never read and written concurrently, so keeping the handle lazy is safe.

msnoise.core.io.xr_get_ccf_all(root, lineage, step_name, station1, station2, components, date)

Load all windowed CCFs for one day written by xr_save_ccf_all().

Returns:

xarray.DataArray with dims (times, taxis).

Raises:

FileNotFoundError – if the NetCDF file does not exist.

msnoise.core.io.xr_get_ccf_daily(root, lineage, step_name, station1, station2, components, date)

Load a single daily-stacked CCF written by xr_save_ccf_daily().

Returns:

xarray.DataArray with dim taxis.

Raises:

FileNotFoundError – if the NetCDF file does not exist.

msnoise.core.io.xr_get_dtt(root, lineage, station1, station2, components, mov_stack)

Load DTT results from a NetCDF file.

Returns:

xarray.Dataset with a DTT variable and dims (times, keys).

Raises:

FileNotFoundError – if the NetCDF file does not exist.

msnoise.core.io.xr_get_dvv_agg(root, lineage, step_name, mov_stack, pair_type: str, component: str)

Load a DVV aggregate result from a NetCDF file.

Returns:

xarray.Dataset (lazy).

Raises:

FileNotFoundError – if the file does not exist.

msnoise.core.io.xr_get_dvv_pairs(root, lineage, step_name, mov_stack, pair_type: str, component: str)

Load per-pair dv/v time series from a NetCDF file.

Returns:

xarray.Dataset with dims (pair, times) and variables dvv and err.

Raises:

FileNotFoundError – if the file does not exist.

msnoise.core.io.xr_get_mwcs(root, lineage, station1, station2, components, mov_stack)

Load MWCS results from a NetCDF file.

Returns:

xarray.Dataset with a MWCS variable and dims (times, taxis, keys).

Raises:

FileNotFoundError – if the NetCDF file does not exist.

msnoise.core.io.xr_get_ref(root, lineage, station1, station2, components, taxis, ignore_network=False)
msnoise.core.io.xr_get_wct_dtt(root, lineage, station1, station2, components, mov_stack)

Load per-pair WCT dt/t results from a NetCDF file.

Returns an xarray.Dataset with variables DTT, ERR, COH each of shape (times, frequency), as written by xr_save_wct_dtt().

Raises:

FileNotFoundError – if the NetCDF file does not exist.

Return type:

xarray.Dataset

msnoise.core.io.xr_load_ccf_for_stack(root, lineage_names, station1, station2, components, dates)

Load per-day CCF data for stacking — the higher-level replacement for get_results_all.

Reads the daily CCF NetCDF files written by s03_compute_no_rotation() for the requested station pair, component and date list. Tries keep_all (per-window, _output/all/) first; falls back to keep_days (daily stacks, _output/daily/) if the former are absent.

Path layout (written by s03):

<root> / *cc_lineage / filter_step / _output / all|daily / <comp> / <sta1>_<sta2> / <date>.nc
Parameters:
  • root – Output folder (params.global_.output_folder).

  • lineage_names – Full lineage name list including the filter step, e.g. ["preprocess_1", "cc_1", "filter_1", "stack_1"].

  • station1 – First station SEED id NET.STA.LOC.

  • station2 – Second station SEED id NET.STA.LOC.

  • components – Component pair string e.g. "ZZ".

  • dates – Iterable of datetime.date or ISO date strings.

Returns:

xarray.Dataset with variable CCF and dims (times, taxis), sorted by times. Empty Dataset if no data found.

msnoise.core.io.xr_load_psd(root, lineage, step_name, seed_id, day)

Load a daily PSD NetCDF written by xr_save_psd().

Parameters:
formatstr

"dataframe" returns a DataFrame or None if file not found. "dataset" (default) returns an xarray.Dataset or None.

msnoise.core.io.xr_load_rms(root, lineage, step_name, seed_id)

Load per-station PSD RMS results from a NetCDF file.

Parameters:
formatstr

"dataframe" returns a DataFrame or None if file not found. "dataset" (default) returns an xarray.Dataset or None.

msnoise.core.io.xr_load_wct(root, lineage, station1, station2, components, mov_stack)

Load WCT results from a NetCDF file.

Returns:

xarray.Dataset with variables WXamp, Wcoh, WXdt and dims (times, freqs, taxis).

Raises:

FileNotFoundError – if the NetCDF file does not exist.

Pure read — xr_save_wct_dtt() writes to a different path, so keeping this handle lazy is safe.

msnoise.core.io.xr_save_ccf(root, lineage, step_name, station1, station2, components, mov_stack, taxis, new, overwrite=False)
msnoise.core.io.xr_save_ccf_all(root, lineage, step_name, station1, station2, components, date, window_times, taxis, corrs)

Save all windowed CCFs for one day as a single NetCDF file.

Replaces the legacy HDF5-based export_allcorr. One file per calendar day — safe for concurrent workers since each worker owns exactly one day.

Path layout:

<root>/<lineage>/<step_name>/_output/all/<components>/<sta1>_<sta2>/<date>.nc
Parameters:
  • window_times – 1-D array of window start times (datetime strings or datetime objects) — the times dimension.

  • taxis – 1-D lag-time axis array — the taxis dimension.

  • corrs – 2-D numpy array (n_windows, n_taxis) of CCF data.

msnoise.core.io.xr_save_ccf_daily(root, lineage, step_name, station1, station2, components, date, taxis, corr)

Save a single daily-stacked CCF as a per-day NetCDF file.

One file per calendar day — safe for concurrent workers since each worker owns exactly one day.

Path layout:

<root>/<lineage>/<step_name>/_output/daily/<components>/<sta1>_<sta2>/<YYYY-MM-DD>.nc
Parameters:
  • corr – 1-D numpy array of the stacked CCF.

  • taxis – 1-D lag-time axis array.

  • datedatetime.date or ISO string "YYYY-MM-DD".

msnoise.core.io.xr_save_dtt(root, lineage, step_name, station1, station2, components, mov_stack, dataset)

Save DTT results to a NetCDF file.

Parameters:

datasetxarray.Dataset with a DTT variable and dims (times, keys), as built by s06_compute_mwcs_dtt.

msnoise.core.io.xr_save_dvv_agg(root, lineage, step_name, mov_stack, pair_type: str, component: str, dataset)

Save a DVV aggregate result to a NetCDF file.

Path layout:

<root>/<lineage>/<step_name>/_output/<mov_stack>/dvv_<pair_type>_<component>.nc
Parameters:

datasetxarray.Dataset with dim times and stat variables as built by aggregate_dvv_pairs().

msnoise.core.io.xr_save_dvv_pairs(root, lineage, step_name, mov_stack, pair_type: str, component: str, dataset)

Save per-pair dv/v time series to a NetCDF file.

Path layout:

<root>/<lineage>/<step_name>/_output/<mov_stack>/dvv_pairs_<pair_type>_<component>.nc
Parameters:

datasetxarray.Dataset with dims (pair, times) and variables dvv and err, as built by aggregate_dvv_pairs().

msnoise.core.io.xr_save_mwcs(root, lineage, step_name, station1, station2, components, mov_stack, taxis, dataset)

Save MWCS results to a NetCDF file.

Parameters:

datasetxarray.Dataset with a MWCS variable and dims (times, taxis, keys), as built by s05_compute_mwcs.

msnoise.core.io.xr_save_psd(root, lineage, step_name, seed_id, day, dataset)

Save a daily PSD result to a NetCDF file.

Path layout:

<root>/<lineage>/<step_name>/_output/daily/<seed_id>/<YYYY-MM-DD>.nc
Parameters:

datasetxarray.Dataset with a PSD variable and dims (times, periods), as built by psd_ppsd_to_dataset().

msnoise.core.io.xr_save_ref(root, lineage, step_name, station1, station2, components, taxis, new, overwrite=False)
msnoise.core.io.xr_save_rms(root, lineage, step_name, seed_id, dataframe)

Save per-station PSD RMS results to a NetCDF file.

Accepts either a DataFrame (legacy, index = DatetimeIndex, columns = band labels) or an xarray.Dataset with a RMS variable and dims (times, bands).

msnoise.core.io.xr_save_stretching(root, lineage, step_name, station1, station2, components, mov_stack, dataset)

Save per-pair stretching results to a NetCDF file.

Path layout:

<root>/<lineage>/<step_name>/_output/<mov_stack[0]>_<mov_stack[1]>/<components>/<sta1>_<sta2>.nc
Parameters:

datasetxarray.Dataset with a STR variable and dims (times, keys) where keys = ['Delta', 'Coeff', 'Error'], as built by s10_stretching.

msnoise.core.io.xr_save_wct(root, lineage, step_name, station1, station2, components, mov_stack, dataset)

Save WCT results to a NetCDF file.

Parameters:

datasetxarray.Dataset with variables WXamp, Wcoh, WXdt and dims (times, freqs, taxis), as built by s08_compute_wct.

msnoise.core.io.xr_save_wct_dtt(root, lineage, step_name, station1, station2, components, mov_stack, taxis, dataset)

Save WCT-DTT results to a NetCDF file.

Parameters:

datasetxarray.Dataset with variables DTT, ERR, COH and dims (times, frequency), as built by s09_compute_wct_dtt.

Signal Processing

MSNoise signal processing, preprocessing helpers, and stacking utilities.

msnoise.core.signal.check_and_phase_shift(trace, taper_length=20.0)
msnoise.core.signal.compute_wct_dtt(freqs, tvec, WXamp, Wcoh, delta_t, lag_min=5, coda_cycles=20, mincoh=0.5, maxdt=0.2, min_nonzero=0.25, freqmin=0.1, freqmax=2.0)

Compute dv/v and associated errors from wavelet coherence transform results.

Parameters:
  • freqs – Frequency values from the WCT.

  • tvec – Time axis.

  • WXamp – Cross-wavelet amplitude array (freqs × taxis).

  • Wcoh – Wavelet coherence array (freqs × taxis).

  • delta_t – Time delay array (freqs × taxis).

  • lag_min – Minimum coda lag in seconds.

  • coda_cycles – Number of periods to use as coda window width.

  • mincoh – Minimum coherence threshold.

  • maxdt – Maximum allowed time delay.

  • min_nonzero – Minimum fraction of valid (non-zero weight) samples required.

  • freqmin – Lower frequency bound for regression.

  • freqmax – Upper frequency bound for regression.

Returns:

Tuple of (dt/t, err, weighting_function).

msnoise.core.signal.find_segments(data, gap_threshold)

Identify continuous non-NaN segments in an xarray DataArray.

Parameters:
  • data – 2-D xarray DataArray (times × lags).

  • gap_threshold – Maximum index gap before treating as a new segment.

Returns:

List of lists of row indices forming each continuous segment.

msnoise.core.signal.getCoherence(dcs, ds1, ds2)

Compute cross-coherence between two spectra.

Parameters:
  • dcs – Cross-spectrum magnitudes (1-D array, length n).

  • ds1 – Auto-spectrum of signal 1 (1-D array, length n).

  • ds2 – Auto-spectrum of signal 2 (1-D array, length n).

Returns:

Complex coherence array of length n, clipped to |coh| <= 1.

msnoise.core.signal.getGaps(stream, min_gap=None, max_gap=None)
msnoise.core.signal.get_preprocessed_stream(output_dir, step_name, goal_day, stations)

Read per-station preprocessed files and return a merged Stream.

Counterpart to save_preprocessed_streams(). Reads only the station files needed for stations (a list of "NET.STA.LOC" strings) and returns them merged into a single Stream.

Missing station files are silently skipped (the cc worker checks for empty streams downstream).

Parameters:
  • output_dir – Base output directory.

  • step_name – Workflow step name (e.g. "preprocess_1").

  • goal_day – Processing date string (YYYY-MM-DD).

  • stations – List of "NET.STA.LOC" strings to load.

Returns:

Stream.

msnoise.core.signal.get_wavelet_type(wavelet_type)

Return a pycwt wavelet object for the given type/parameter pair.

Parameters:

wavelet_type – Tuple (name, param) or (name,). Supported names: "Morlet", "Paul", "DOG", "MexicanHat".

Returns:

Corresponding pycwt wavelet instance.

msnoise.core.signal.get_wct_avgcoh(freqs, tvec, wcoh, freqmin, freqmax, lag_min=5, coda_cycles=20)

Calculate average wavelet coherence over a frequency range and coda window.

Parameters:
  • freqs – Frequency array.

  • tvec – Time axis.

  • wcoh – Wavelet coherence array (freqs × taxis).

  • freqmin – Lower frequency bound.

  • freqmax – Upper frequency bound.

  • lag_min – Minimum coda lag in seconds.

  • coda_cycles – Number of periods to use as coda window width.

Returns:

Average coherence per frequency bin within [freqmin, freqmax].

msnoise.core.signal.get_window(window='boxcar', half_win=3)

Return a normalised complex smoothing window for MWCS processing.

Parameters:
  • window"boxcar" (default) or "hanning".

  • half_win – Half-width in samples (full window = 2*half_win+1).

Returns:

Complex numpy array of length 2*half_win+1, sum-normalised.

msnoise.core.signal.make_same_length(st)

This function takes a stream of equal sampling rate and makes sure that all channels have the same length and the same gaps.

msnoise.core.signal.nextpow2(x)

Returns the next power of 2 of x.

Parameters:

x (int) – any value

Return type:

int

Returns:

the next power of 2 of x

msnoise.core.signal.preload_instrument_responses(session, return_format='dataframe')

This function preloads all instrument responses from response_path and stores the seed ids, start and end dates, and paz for every channel in a DataFrame. Any file readable by obspy’s read_inventory will be processed.

Parameters:
Return type:

DataFrame or Inventory

Returns:

A table containing all channels with the time of operation and poles and zeros (DataFrame), or an obspy Inventory object.

msnoise.core.signal.prepare_abs_positive_fft(line, sampling_rate)

Return the positive-frequency part of the absolute FFT of line.

Parameters:
  • line – 1-D signal array.

  • sampling_rate – Sampling rate in Hz.

Returns:

(freq, val) - positive-frequency vector and absolute FFT values.

msnoise.core.signal.psd_df_rms(d, freqs, output='VEL')

Compute per-frequency-band RMS from PSD data.

Parameters:
  • dxarray.Dataset with a PSD variable and dims (times, periods), as returned by xr_load_psd().

  • freqs – List of (fmin, fmax) tuples defining frequency bands.

  • output – Physical unit - "VEL" (default), "ACC", or "DISP".

Returns:

xarray.Dataset with one RMS variable and dims (times, bands), ready for xr_save_rms().

msnoise.core.signal.psd_rms(s, f)

Compute RMS from a power spectrum array and frequency array.

Parameters:
  • s – Power spectral density values (1-D array).

  • f – Frequency values (1-D array, same length as s).

Returns:

Float - square-root of the integrated power.

msnoise.core.signal.save_preprocessed_streams(stream, output_dir, step_name, goal_day)

Write preprocessed traces to per-station files.

Output layout:

<output_dir>/<step_name>/_output/<goal_day>/<NET.STA.LOC>.mseed

One file per station (all channels for that station in the same file). This is concurrency-safe: each station writes its own file with no shared state between parallel workers.

Parameters:
  • streamStream to write.

  • output_dir – Base output directory (params.global_.output_folder).

  • step_name – Workflow step name (e.g. "preprocess_1").

  • goal_day – Processing date string (YYYY-MM-DD).

Returns:

List of written file paths (one per station).

msnoise.core.signal.smoothCFS(cfs, scales, dt, ns, nt)

Smooth CWT coefficients along both time and scale axes.

Parameters:
  • cfs – CWT coefficient array, shape (n_scales, n_times).

  • scales – 1-D array of wavelet scales.

  • dt – Sampling interval in seconds.

  • ns – Length of the moving-average filter across scales.

  • nt – Gaussian width parameter along time.

Returns:

Smoothed coefficient array, same shape as cfs.

msnoise.core.signal.stack(data, stack_method='linear', pws_timegate=10.0, pws_power=2, goal_sampling_rate=20.0)
Parameters:
  • data (numpy.ndarray) – the data to stack, each row being one CCF

  • stack_method (str) – either linear: average of all CCF or pws to compute the phase weigthed stack. If pws is selected, the function expects the pws_timegate and pws_power.

  • pws_timegate (float) – PWS time gate in seconds. Width of the smoothing window to convolve with the PWS spectrum.

  • pws_power (float) – Power of the PWS weights to be applied to the CCF stack.

  • goal_sampling_rate (float) – Sampling rate of the CCF array submitted

Return type:

numpy.array

Returns:

the stacked CCF.

msnoise.core.signal.validate_stack_data(dataset, stack_type='reference')

Validates stack data before processing

Parameters:

dataset: xarray Dataset to validate stack_type: Type of stack (“reference” or “moving”) for error messages

Returns:

(is_valid, message) tuple

msnoise.core.signal.wiener_filt(data, M, N, gap_threshold)

Apply a 2-D Wiener filter to the CCF dataset, segment by segment.

Operates only on continuous (non-NaN) segments of the time axis to avoid smearing across data gaps.

Parameters:
  • data – xarray Dataset containing a CCF variable (times × taxis).

  • M – Wiener filter window size along the time axis.

  • N – Wiener filter window size along the lag axis.

  • gap_threshold – Passed to find_segments().

Returns:

Copy of data with the filtered CCF variable.

msnoise.core.signal.winsorizing(data, params, input='timeseries', nfft=0)

Clip (Winsorise) a 2-D data array in the time or frequency domain.

Supports both one-shot sign-clipping (winsorizing == -1) and RMS-based clipping (winsorizing > 0). When input is "fft" the array is temporarily transformed back to the time domain, clipped, then re-transformed.

Parameters:
  • data – 1-D or 2-D array of shape (n_traces, n_samples).

  • params – MSNoise params object; must expose params.cc.winsorizing.

  • input"timeseries" (default) or "fft".

  • nfft – FFT length used when input is "fft"; ignored otherwise.

Returns:

Clipped array (same shape as input).

msnoise.core.signal.xwt(trace_ref, trace_current, fs, ns=3, nt=0.25, vpo=12, freqmin=0.1, freqmax=8.0, nptsfreq=100, wavelet_type=('Morlet', 6.0))

Wavelet Coherence Transform (WCT) between two time series.

Convenience wrapper around prepare_ref_wct() + apply_wct(). Use those two functions directly in hot loops (Mode A fixed-REF) to avoid recomputing the reference CWT on every call.

Parameters:
  • trace_ref – Reference signal (1-D array).

  • trace_current – Current signal (1-D array, same length).

  • fs – Sampling frequency in Hz.

  • ns – Scale-axis smoothing parameter.

  • nt – Time-axis smoothing parameter.

  • vpo – Voices-per-octave; higher = finer scale resolution.

  • freqmin – Lowest frequency of interest (Hz).

  • freqmax – Highest frequency of interest (Hz).

  • nptsfreq – Number of frequency points between freqmin and freqmax.

  • wavelet_type(name, param) tuple passed to get_wavelet_type().

Returns:

(WXamp, WXspec, WXangle, Wcoh, WXdt, freqs, coi)

Computation (CC / MWCS / WCT)

MSNoise core computation functions — cross-correlation, whitening, MWCS.

Moved from msnoise/move2obspy.py.

msnoise.core.compute.mwcs(current, reference, freqmin, freqmax, df, tmin, window_length, step, smoothing_half_win=5)

The current time series is compared to the reference. Both time series are sliced in several overlapping windows. Each slice is mean-adjusted and cosine-tapered (85% taper) before being Fourier- transformed to the frequency domain. \(F_{cur}(\nu)\) and \(F_{ref}(\nu)\) are the first halves of the Hermitian symmetric Fourier-transformed segments. The cross-spectrum \(X(\nu)\) is defined as \(X(\nu) = F_{ref}(\nu) F_{cur}^*(\nu)\)

in which \({}^*\) denotes the complex conjugation. \(X(\nu)\) is then smoothed by convolution with a Hanning window. The similarity of the two time-series is assessed using the cross-coherency between energy densities in the frequency domain:

\(C(\nu) = \frac{|\overline{X(\nu))}|}{\sqrt{|\overline{F_{ref}(\nu)|^2} |\overline{F_{cur}(\nu)|^2}}}\)

in which the over-line here represents the smoothing of the energy spectra for \(F_{ref}\) and \(F_{cur}\) and of the spectrum of \(X\). The mean coherence for the segment is defined as the mean of \(C(\nu)\) in the frequency range of interest. The time-delay between the two cross correlations is found in the unwrapped phase, \(\phi(\nu)\), of the cross spectrum and is linearly proportional to frequency:

\(\phi_j = m. \nu_j, m = 2 \pi \delta t\)

The time shift for each window between two signals is the slope \(m\) of a weighted linear regression of the samples within the frequency band of interest. The weights are those introduced by [Clarke2011], which incorporate both the cross-spectral amplitude and cross-coherence, unlike [Poupinet1984]. The errors are estimated using the weights (thus the coherence) and the squared misfit to the modelled slope:

\(e_m = \sqrt{\sum_j{(\frac{w_j \nu_j}{\sum_i{w_i \nu_i^2}})^2}\sigma_{\phi}^2}\)

where \(w\) are weights, \(\nu\) are cross-coherences and \(\sigma_{\phi}^2\) is the squared misfit of the data to the modelled slope and is calculated as \(\sigma_{\phi}^2 = \frac{\sum_j(\phi_j - m \nu_j)^2}{N-1}\)

The output of this process is a table containing, for each moving window: the central time lag, the measured delay, its error and the mean coherence of the segment.

Warning

The time series will not be filtered before computing the cross-spectrum! They should be band-pass filtered around the freqmin-freqmax band of interest beforehand.

Parameters:
  • current (numpy.ndarray) – The “Current” timeseries

  • reference (numpy.ndarray) – The “Reference” timeseries

  • freqmin (float) – The lower frequency bound to compute the dephasing (in Hz)

  • freqmax (float) – The higher frequency bound to compute the dephasing (in Hz)

  • df (float) – The sampling rate of the input timeseries (in Hz)

  • tmin (float) – The leftmost time lag (used to compute the “time lags array”)

  • window_length (float) – The moving window length (in seconds)

  • step (float) – The step to jump for the moving window (in seconds)

  • smoothing_half_win (int) – If different from 0, defines the half length of the smoothing hanning window.

Return type:

numpy.ndarray

Returns:

[time_axis,delta_t,delta_err,delta_mcoh]. time_axis contains the central times of the windows. The three other columns contain dt, error and mean coherence for each window.

msnoise.core.compute.myCorr2(data, maxlag, energy, index, plot=False, nfft=None, normalized=False)

Compute cross-correlations for all requested station pairs.

Tiled-batch implementation: processes pairs in chunks of _MYCORR2_CHUNK to amortise Python loop overhead while keeping peak memory bounded. Faster than the original per-pair loop for N > ~50 stations (~2-3x for N=200-500); falls back gracefully to near-loop speed for small N.

Parameters:
  • data (numpy.ndarray) – 2-D array (n_stations, nfft) containing the FFT of each pre-whitened time series.

  • maxlag (int) – Output half-length in samples; CCF returned over [-maxlag : maxlag] (length 2*maxlag + 1).

  • energy (numpy.ndarray) – Per-station RMS energy (n_stations,) used for POW normalisation.

  • index (list) – List of (ccf_id, sta1_idx, sta2_idx) tuples.

  • normalized (str or bool) – "POW", "MAX", "ABSMAX", or falsy for none.

Return type:

dict

Returns:

{ccf_id: ccf_array} for every pair in index.

msnoise.core.compute.pcc_xcorr(data, maxlag, energy, index, plot=False, nfft=None, normalized=False)

Phase Cross-Correlation v=2 (PCC2) — pure NumPy/SciPy implementation.

Replaces the former dependency on the unmaintained phasecorr package with a self-contained translation of the FFT-accelerated pcc2_set routine from FastPCC (Ventosa et al., SRL 2019; IEEE-TGRS 2023).

Algorithm (matches FastPCC pcc2_set):

  1. Compute the amplitude-normalised analytic signal (phase signal) φ[n] = X_a[n] / |X_a[n]| for each trace — amplitude information is discarded entirely, so the result is insensitive to amplitude transients (earthquakes, glitches) without explicit temporal normalisation.

  2. Zero-pad to Nz = next_fast_len(N + maxlag) to avoid circular wrap-around (linear cross-correlation).

  3. Compute PCC2(lag) = IFFT(conj(FFT(φ1)) · FFT(φ2)) / (Nz · N) for every pair in index — O(N log N), same cost as GNCC.

Parameters:
  • data (numpy.ndarray) – 2-D time-domain array (n_stations, N); real-valued. Unlike myCorr2(), PCC2 requires the time-domain input because the Hilbert transform must be computed before any FFT.

  • maxlag (int or float) – Half-length of output CCF in samples.

  • energy – Unused (kept for API compatibility with myCorr2()).

  • index – List of (ccf_id, sta1_idx, sta2_idx) tuples.

  • normalized"MAX" or "ABSMAX" to normalise output; falsy for none. ("POW" is meaningless for PCC2 since amplitudes are discarded; it is silently ignored.)

Return type:

dict

Returns:

{ccf_id: ccf_array} of length 2*maxlag + 1 per pair.

References

Ventosa S., Schimmel M. & E. Stutzmann, 2019. SRL 90(4):1663-1669. https://doi.org/10.1785/0220190022

Ventosa S. & M. Schimmel, 2023. IEEE-TGRS 61:1-17. https://doi.org/10.1109/TGRS.2023.3294302

msnoise.core.compute.smooth(x, window='boxcar', half_win=3)

some window smoothing

msnoise.core.compute.whiten(data, Nfft, delta, freqmin, freqmax, plot=False, returntime=False)

This function takes 1-dimensional data timeseries array, goes to frequency domain using fft, whitens the amplitude of the spectrum in frequency domain between freqmin and freqmax and returns the whitened fft.

Parameters:
  • data (numpy.ndarray) – Contains the 1D time series to whiten

  • Nfft (int) – The number of points to compute the FFT

  • delta (float) – The sampling frequency of the data

  • freqmin (float) – The lower frequency bound

  • freqmax (float) – The upper frequency bound

  • plot (bool) – Whether to show a raw plot of the action (default: False)

Return type:

numpy.ndarray

Returns:

The FFT of the input trace, whitened between the frequency bounds

msnoise.core.compute.whiten2(fft, Nfft, low, high, porte1, porte2, psds, whiten_type)

This function takes 1-dimensional data timeseries array, goes to frequency domain using fft, whitens the amplitude of the spectrum in frequency domain between freqmin and freqmax and returns the whitened fft.

Parameters:
  • data (numpy.ndarray) – Contains the 1D time series to whiten

  • Nfft (int) – The number of points to compute the FFT

  • delta (float) – The sampling frequency of the data

  • freqmin (float) – The lower frequency bound

  • freqmax (float) – The upper frequency bound

  • plot (bool) – Whether to show a raw plot of the action (default: False)

Return type:

numpy.ndarray

Returns:

The FFT of the input trace, whitened between the frequency bounds

msnoise.core.compute.compute_wct_dtt_batch(freqs, taxis, WXamp, Wcoh, WXdt, dtt_params, dist: float = 0.0)

Compute WCT dt/t and average coherence for one time-step.

Shared implementation used by both the fused path in s08 (where WXamp/Wcoh/WXdt are freshly computed in memory) and the standalone s09 path (where they are loaded from a WCT NetCDF file).

Parameters:
  • freqs – 1-D frequency array from the WCT (Hz).

  • taxis – 1-D lag-time axis array (s).

  • WXamp – 2-D cross-wavelet amplitude array (freqs, taxis).

  • Wcoh – 2-D wavelet coherence array (freqs, taxis).

  • WXdt – 2-D time-delay array (freqs, taxis).

  • dtt_params – Merged params object containing wavelet_dtt.* attributes.

  • dist – Interstation distance in km (for dynamic lag). Default 0.

Returns:

Tuple (dtt_row, err_row, coh_row, freqs_subset) where dtt_row and err_row are 1-D arrays over the DTT frequency subset and coh_row is the average coherence per frequency bin.

msnoise.core.compute.resolve_wct_lag_min(dtt_params, dist: float) float

Resolve the coda lag minimum for WCT dt/t computation.

Parameters:
  • dtt_params – Params object with wavelet_dtt attributes.

  • dist – Interstation distance in km (used for dynamic lag).

Returns:

Lag minimum in seconds.

msnoise.core.compute.build_wct_dtt_dataset(dates_list, dtt_rows, err_rows, coh_rows, freqs_subset)

Build a sorted xarray Dataset of WCT dt/t results.

Shared between the fused save path in s08 and the save path in s09.

Parameters:
  • dates_list – List of datetime64 timestamps.

  • dtt_rows – List of 1-D dt/t arrays (one per date).

  • err_rows – List of 1-D error arrays.

  • coh_rows – List of 1-D average-coherence arrays.

  • freqs_subset – 1-D frequency array for the DTT band.

Returns:

xarray.Dataset with variables DTT, ERR, COH and dims (times, frequency), sorted by times.

Preprocessing

Waveform Pre-processing

Pairs are first split and a station list is created. The database is then queried to get file paths. For each station, all files potentially containing data for the day are opened. The traces are then merged and split, to obtain the most continuous chunks possible. The different chunks are then demeaned, tapered and merged again to a 1-day long trace. If a chunk is not aligned on the sampling grid (that is, start at a integer times the sample spacing in s) , the chunk is phase-shifted in the frequency domain. This requires tapering and fft/ifft. If the gap between two chunks is small, compared to preprocess_max_gap, the gap is filled with interpolated values. Larger gaps will not be filled with interpolated values.

Each 1-day long trace is then high-passed (at preprocess_highpass Hz), then if needed, low-passed (at preprocess_lowpass Hz) and decimated/downsampled. Decimation/Downsampling are configurable (resampling_method) and users are advised testing Decimate. One advantage of Downsampling over Decimation is that it is able to downsample the data by any factor, not only integer factors. Downsampling is achieved with the ObsPy Lanczos resampler which we tested against the old scikits.samplerate.

If configured, each 1-day long trace is corrected for its instrument response. Currently, only dataless seed and inventory XML are supported.

../.static/preprocessing.png

As from MSNoise 1.5, the preprocessing routine is separated from the compute_cc and can be used by plugins with their own parameters. The routine returns a Stream object containing all the traces for all the stations/components.

msnoise.core.preprocessing.apply_preprocessing_to_stream(stream, params, responses=None, logger=None)

Apply MSNoise signal-processing pipeline to a raw ObsPy Stream.

This is the core preprocessing logic extracted from preprocess() so it can be used for both local-archive and FDSN/EIDA data paths.

Operations applied (in order):

  1. Phase-shift alignment

  2. Gap detection and filling (up to preprocess_max_gap samples)

  3. Sampling-rate check and down-sampling (Resample / Decimate / Lanczos)

  4. Detrend, taper, highpass / lowpass filter

  5. Instrument response removal (if remove_response is set)

Parameters:
  • stream – Raw Stream for one station.

  • paramsMSNoiseParams for this lineage.

  • responses – ObsPy Inventory for instrument response removal, or None.

  • logger – Python logger (defaults to msnoise.preprocessing).

Returns:

Processed Stream, or an empty stream if the data could not be processed.

msnoise.core.preprocessing.preprocess(stations, comps, goal_day, params, responses=None, loglevel='INFO', logger='msnoise.compute_cc_norot_child')

Fetches data for each stations and each comps using the data_availability table in the database.

To correct for instrument responses, make sure to set remove_response to “Y” in the config and to provide the responses DataFrame.

Example:

>>> from msnoise.api import connect, get_params
>>> from msnoise.core.preprocessing import preprocess
>>> db = connect()
>>> params = get_params(db)
>>> responses = preload_instrument_responses(db)
>>> st = preprocess(db, ["YA.UV06","YA.UV10"], ["Z",], "2010-09-01", params, responses, loglevel="INFO")
>>> st
 2 Trace(s) in Stream:
YA.UV06.00.HHZ | 2010-09-01T00:00:00.000000Z - 2010-09-01T23:59:59.950000Z | 20.0 Hz, 1728000 samples
YA.UV10.00.HHZ | 2010-09-01T00:00:00.000000Z - 2010-09-01T23:59:59.950000Z | 20.0 Hz, 1728000 samples
Parameters:
  • db (sqlalchemy.orm.session.Session) – A Session object, as obtained by msnoise.api.connect().

  • stations (list of str) – a list of station names, in the format NET.STA.LOC

  • comps (list of str) – a list of component names, in Z,N,E,1,2.

  • goal_day (str) – the day of data to load, ISO 8601 format: e.g. 2016-12-31.

  • params (class) – an object containing the config parameters, as obtained by msnoise.api.get_params().

  • responses (pandas.DataFrame) – a DataFrame containing the instrument responses, as obtained by msnoise.api.preload_instrument_responses().

Return type:

obspy.core.stream.Stream

Returns:

A Stream object containing all traces.

Stations & Data Sources

MSNoise station and data-availability management.

msnoise.core.stations.ARCHIVE_STRUCTURES = {'BUD': 'NET/STA/STA.NET.LOC.CHAN.YEAR.DAY', 'IDDS': 'YEAR/NET/STA/CHAN.TYPE/DAY/NET.STA.LOC.CHAN.TYPE.YEAR.DAY.HOUR', 'PDF': 'YEAR/STA/CHAN.TYPE/NET.STA.LOC.CHAN.TYPE.YEAR.DAY', 'SDS': 'YEAR/NET/STA/CHAN.TYPE/NET.STA.LOC.CHAN.TYPE.YEAR.DAY'}

Archive path templates keyed by structure name. Used by s01_scan_archive() and populate_stations().

msnoise.core.stations.add_data_source(session, name, uri='', data_structure='SDS', auth_env='MSNOISE')

Add a new DataSource to the database.

Parameters:
  • session – SQLAlchemy session.

  • name – Human label e.g. "local", "IRIS", "EIDA".

  • uri – Data location URI. Schemes: - bare path or sds:///path → local SDS archive - fdsn://http://... → FDSN web service - eida://http://... → EIDA routing client

  • data_structure – SDS sub-path format for local sources (default "SDS"). Ignored for FDSN/EIDA.

  • auth_env – Environment variable prefix for credentials (default "MSNOISE"). Worker looks up {auth_env}_FDSN_USER, {auth_env}_FDSN_PASSWORD, {auth_env}_FDSN_TOKEN.

Returns:

The created DataSource object.

msnoise.core.stations.check_stations_uniqueness(session, station)
Parameters:
  • session

  • station

Returns:

msnoise.core.stations.get_data_availability(session, net=None, sta=None, loc=None, chan=None, starttime=None, endtime=None)

Returns the DataAvailability objects for specific net, sta, starttime or endtime

Parameters:
Return type:

list

Returns:

list of DataAvailability

msnoise.core.stations.get_data_source(session, id=None, name=None)

Retrieve a DataSource by id or name.

Parameters:
  • session – SQLAlchemy session.

  • id – Primary key of the DataSource.

  • name – Human label of the DataSource.

Returns:

DataSource or None.

Raises:

ValueError – If neither id nor name is provided.

msnoise.core.stations.get_default_data_source(session)

Return the project default DataSource.

The default is the DataSource with ref=1 (created by the installer).

Parameters:

session – SQLAlchemy session.

Returns:

DataSource.

Raises:

RuntimeError – If no DataSource exists (installer not run).

msnoise.core.stations.get_interstation_distance(station1, station2, coordinates='DEG')

Returns the distance in km between station1 and station2.

Warning

Currently the stations coordinates system have to be the same!

Parameters:
  • station1 (Station) – A Station object

  • station2 (Station) – A Station object

  • coordinates (str) – The coordinates system. “DEG” is WGS84 latitude/ longitude in degrees. “UTM” is expressed in meters.

Return type:

float

Returns:

The interstation distance in km

msnoise.core.stations.get_new_files(session)

Returns the files marked “N”ew or “M”odified in the database

Parameters:

session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

Return type:

list

Returns:

list of DataAvailability

msnoise.core.stations.get_station(session, net, sta)

Get one Station from the database.

Parameters:
Return type:

msnoise.msnoise_table_def.declare_tables.Station

Returns:

a Station Object

msnoise.core.stations.get_station_pairs(session, used=None, net=None, include_single_station=False)

Returns an iterator over all possible station pairs. If auto-correlation is configured in the database, returns N*N pairs, otherwise returns N*(N-1)/2 pairs.

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • used (bool, int) – Select only stations marked used if False (default) or all stations present in the database if True

  • net (str) – Network code to filter for the pairs.

Return type:

iterable

Returns:

An iterable of Station object pairs

msnoise.core.stations.get_stations(session, all=False, net=None, format='raw')

Get Stations from the database.

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • all (bool) – Returns all stations from the database if True, or only stations where used = 1 if False (default)

  • net (str) – if set, limits the stations returned to this network

Return type:

list of msnoise.msnoise_table_def.declare_tables.Station

Returns:

list of Station

msnoise.core.stations.get_waveform_path(session, da)

Reconstruct the full absolute path for a DataAvailability record.

Joins DataSource.uri with da.path and da.file. If the DA record has no data_source_id (legacy or NULL), falls back to treating da.path as an absolute path (backward-compatible behaviour).

Parameters:
Returns:

Absolute path string to the waveform file.

msnoise.core.stations.import_stationxml(session, path_or_url, data_source_id=None, save_to_response_path=True)

Import stations from a StationXML file or URL.

Parses the inventory with ObsPy and creates or updates Station rows. Also populates used_location_codes and used_channel_names from the channel-level inventory (requires level=channel in FDSN queries). Empty location codes are stored as "--" (MSNoise convention).

When save_to_response_path is True (default), the parsed inventory is written as a StationXML file into the project’s response_path directory (read from global config). This makes the instrument responses immediately available to the preprocessing step without any manual file copying. The file is named <NET>.<STA>.xml for single-network inventories, or inventory_<timestamp>.xml when the inventory spans multiple networks. Saving failures are logged as warnings but never abort the import.

Parameters:
  • session – SQLAlchemy session.

  • path_or_url – Path to a local StationXML file, or a URL (e.g. an FDSN station web service query URL with level=channel).

  • data_source_id – DataSource to assign to imported stations. None → project default (DataSource.ref=1).

  • save_to_response_path – Write the inventory to response_path after a successful import (default True).

Returns:

Tuple (created, updated, saved_path) where saved_path is the absolute path of the written file, or None if not saved.

msnoise.core.stations.list_data_sources(session)

Return all DataSource rows.

Parameters:

session – SQLAlchemy session.

Returns:

List of DataSource.

msnoise.core.stations.mark_data_availability(session, net, sta, flag)

Updates the flag of all DataAvailability objects matching net.sta in the database

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • net (str) – Network code

  • sta (str) – Station code

  • flag (str) – Status of the DataAvailability object: New, Modified or Archive. Values accepted are {‘N’, ‘M’, ‘A’}

msnoise.core.stations.read_waveforms_from_availability(session, da_records, t_start, t_end, logger=None)

Read waveforms from a list of DataAvailability records into a Stream.

Builds the full path for each record via get_waveform_path() (honouring the DataSource root), deduplicates paths, and reads each file sliced to [t_start, t_end] using ObsPy. Files that cannot be read are silently skipped with a debug log.

This is the canonical “DA records → waveform Stream” helper that any worker (PSD, future steps) should use instead of hand-rolling os.path.join(f.path, f.file).

Parameters:
  • session – SQLAlchemy session.

  • da_records – Iterable of DataAvailability ORM objects.

  • t_startUTCDateTime start.

  • t_endUTCDateTime end.

  • logger – Optional logging.Logger; uses module logger if None.

Returns:

Stream (may be empty).

msnoise.core.stations.resolve_data_source(session, station)

Return the effective DataSource for a station.

If the station has data_source_id set, that DataSource is returned. Otherwise the project default (ref=1) is used.

Parameters:
  • session – SQLAlchemy session.

  • stationStation ORM object.

Returns:

DataSource.

msnoise.core.stations.set_all_stations_source(session, data_source_id)

Assign a DataSource to every station in the database.

Parameters:
  • session – SQLAlchemy session.

  • data_source_id – Primary key of the DataSource. None reverts all to default.

msnoise.core.stations.set_network_source(session, net, data_source_id)

Assign a DataSource to all stations of a network.

Parameters:
  • session – SQLAlchemy session.

  • net – Network code.

  • data_source_id – Primary key of the DataSource. None reverts to default.

msnoise.core.stations.set_station_source(session, net, sta, data_source_id)

Assign a specific DataSource to a station.

Parameters:
  • session – SQLAlchemy session.

  • net – Network code.

  • sta – Station code.

  • data_source_id – Primary key of the DataSource to assign. Pass None to revert to the project default.

Raises:

ValueError – If the station or DataSource does not exist.

msnoise.core.stations.to_sds(stats, year, jday)

Build an SDS-format relative file path from ObsPy trace stats.

Returns a path string of the form:

YYYY/NET/STA/CHAN.D/NET.STA.LOC.CHAN.D.YYYY.DDD
Parameters:
  • statsobspy.core.trace.Stats object.

  • year – 4-digit year integer.

  • jday – Julian day-of-year integer (1-366).

Returns:

Relative SDS path string.

msnoise.core.stations.update_data_availability(session, net, sta, loc, chan, path, file, starttime, endtime, data_duration, gaps_duration, samplerate, data_source_id=None)

Updates a DataAvailability object in the database

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • net (str) – The network code of the Station

  • sta (str) – The station code

  • chan (str) – The component (channel)

  • path (str) – Path to the folder containing the file, relative to DataSource.uri.

  • file (str) – The name of the file

  • starttime (datetime.datetime) – Start time of the file

  • endtime (datetime.datetime) – End time of the file

  • data_duration (float) – Cumulative duration of available data in the file

  • gaps_duration (float) – Cumulative duration of gaps in the file

  • samplerate (float) – Sample rate of the data in the file (in Hz)

  • data_source_id (int or None) – FK to DataSource. None → project default.

msnoise.core.stations.update_data_source(session, id, **kwargs)

Update fields on an existing DataSource.

Parameters:
  • session – SQLAlchemy session.

  • id – Primary key of the DataSource to update.

  • kwargs – Fields to update: name, uri, data_structure, auth_env.

Returns:

Updated DataSource.

Raises:

ValueError – If the DataSource does not exist.

msnoise.core.stations.update_station(session, net, sta, X, Y, altitude, coordinates='UTM', instrument='N/A', used=1)

Updates or Insert a new Station in the database.

See also

msnoise.msnoise_table_def.declare_tables.Station

Parameters:
  • session (sqlalchemy.orm.session.Session) – A Session object, as obtained by connect()

  • net (str) – The network code of the Station

  • sta (str) – The station code

  • X (float) – The X coordinate of the station (Easting or Longitude)

  • Y (float) – The Y coordinate of the station (Northing or Latitude)

  • altitude (float) – The altitude of the station

  • coordinates (str) – The coordinates system. “DEG” is WGS84 latitude/ longitude in degrees. “UTM” is expressed in meters.

  • instrument (str) – The instrument code, useful with PAZ correction

  • used (bool) – Whether this station must be used in the computations.

msnoise.core.stations.populate_stations(db, loglevel='INFO')

Scan the default DataSource archive and populate the Station table.

Walks the archive directory according to the configured data_structure (SDS, BUD, IDDS, PDF, or a custom format). Station coordinates are initialised to zero — use import_stationxml() afterwards to set real coordinates.

For custom data structures, a custom.py file in the current working directory must export a populate(data_folder) function that returns a {NET_STA: [net, sta, lon, lat, alt, coordinates]} dictionary.

Parameters:
  • db – SQLAlchemy session.

  • loglevel – Logging verbosity (default "INFO").

Returns:

True on success.

FDSN / EIDA Data Access

FDSN/EIDA waveform fetching for the MSNoise preprocess step.

This module handles fetching raw waveforms from FDSN web services or the EIDA routing client, writing optional raw caches, and per-station error handling. It is the only place in MSNoise that makes network calls to external data services.

msnoise.core.fdsn.build_client(ds)

Build an ObsPy client for the given DataSource.

Parameters:

dsDataSource ORM object.

Returns:

An ObsPy client with a get_waveforms_bulk method.

msnoise.core.fdsn.fetch_and_preprocess(db, jobs, goal_day, params, responses=None, loglevel='INFO')

Fetch waveforms from FDSN/EIDA for a batch of stations on one day.

Groups jobs by DataSource (all jobs in a batch share the same data_source_id when group_by="day_lineage_datasource" is used), issues one get_waveforms_bulk call, splits the result by station, and optionally writes raw files.

Parameters:
  • db – SQLAlchemy session.

  • jobs – List of Job ORM objects (all same day, same DataSource).

  • goal_day – Date string YYYY-MM-DD.

  • paramsMSNoiseParams for this lineage.

  • responses – ObsPy Inventory for instrument response removal, or None.

  • loglevel – Logging level string.

Returns:

Tuple (stream, done_jobs, failed_jobs) where stream contains preprocessed traces for all successfully fetched stations.

msnoise.core.fdsn.fetch_raw_waveforms(db, jobs, goal_day, params, t_start=None, t_end=None)

Fetch raw (unprocessed) waveforms from FDSN/EIDA for a batch of stations.

Issues one get_waveforms_bulk call covering all stations in jobs, optionally writes a raw file cache (fdsn_keep_raw=Y), and returns the combined Stream. No preprocessing is applied — the caller receives exactly what the FDSN server returns.

Use this when downstream processing needs raw data (e.g. PSD computation via ObsPy PPSD, which handles its own response correction internally). For pre-processing + response removal use fetch_and_preprocess().

Parameters:
  • db – SQLAlchemy session.

  • jobs – List of Job ORM objects (all same DataSource).

  • goal_day – Date string YYYY-MM-DD.

  • paramsMSNoiseParams for this lineage.

  • t_start – Optional UTCDateTime override for the fetch window start (default: midnight of goal_day).

  • t_end – Optional UTCDateTime override for the fetch window end (default: midnight + 86400 s).

Returns:

Stream (may be empty on failure).

msnoise.core.fdsn.fetch_waveforms_bulk(client, bulk_request, retries=3)

Issue a bulk waveform request with retry logic.

Parameters:
  • client – ObsPy FDSN/EIDA client.

  • bulk_request – List of (net, sta, loc, chan, t1, t2) tuples.

  • retries – Number of retry attempts for transient errors.

Returns:

Stream (may be empty on failure).

Raises:

Re-raises auth errors and no-data exceptions immediately.

msnoise.core.fdsn.is_remote_source(uri: str) bool

Return True if the DataSource URI points to a remote service.

msnoise.core.fdsn.parse_datasource_scheme(uri: str) str

Return the scheme of a DataSource URI.

Parameters:

uri – DataSource.uri string.

Returns:

One of "local", "sds", "fdsn", "eida".

msnoise.core.fdsn.get_auth(auth_env: str) dict

Read credentials from environment variables for the given prefix.

Looks up: - {auth_env}_FDSN_USER - {auth_env}_FDSN_PASSWORD - {auth_env}_FDSN_TOKEN (path to EIDA token file, or token string)

Parameters:

auth_env – Env var prefix (e.g. "MSNOISE", "IRIS").

Returns:

Dict with keys user, password, token (any may be None).