Core Modules
The msnoise.core package contains all internal implementation modules.
Plugin authors should import from msnoise.plugins for a stable API.
Database & Connection
MSNoise database connection and logging utilities.
- msnoise.core.db.connect(inifile=None)
Establishes a connection to the database and returns a Session object.
- Parameters:
inifile (string) – The path to the db.ini file to use. Defaults to os.cwd() + db.ini
- Return type:
- Returns:
Sessionobject, needed for many of the other API methods.
- msnoise.core.db.create_database_inifile(tech, hostname, database, username, password, prefix='')
Creates the db.ini file based on supplied parameters.
- Parameters:
tech (int) – The database technology used: 1=sqlite 2=mysql
hostname (string) – The hostname of the server (if tech=2) or the name of the sqlite file if tech=1)
database (string) – The database name
username (string) – The user name
prefix (string) – The prefix to use for all tables
password (string) – The password of user
- Returns:
None
- msnoise.core.db.get_engine(inifile=None)
Returns the a SQLAlchemy Engine
- msnoise.core.db.get_logger(name, loglevel=None, with_pid=False)
Returns the current configured logger or configure a new one.
- msnoise.core.db.read_db_inifile(inifile=None)
Reads the parameters from the db.ini file.
- Parameters:
inifile (string) – The path to the db.ini file to use. Defaults to os.cwd() + db.ini
- Return type:
collections.namedtuple
- Returns:
tech, hostname, database, username, password
Configuration & Params
MSNoise configuration management, parameter merging, and plot filename helpers.
- msnoise.core.config.build_plot_outfile(outfile, plot_name, lineage_names, *, pair=None, components=None, mov_stack=None, extra=None)
Resolve the output filename for a plot.
When outfile starts with
"?", replaces"?"with a standardised auto-generated tag and prepends plot_name, producing a filename of the form:<plot_name>__<lineage>[__<pair>][__<comp>][__m<ms>][__<extra>].<ext>
If outfile does not start with
"?", it is returned unchanged (the caller supplied an explicit path).If outfile is
Noneor empty, returnsNone.- Parameters:
outfile – Raw
outfileargument passed to the plot function (e.g."?.png").plot_name – Short plot identifier, e.g.
"ccftime"or"dvv_mwcs". Must not contain__.lineage_names – List of step name strings (e.g.
["preprocess_1", "cc_1", "filter_1", "stack_1"]) fromMSNoiseResult.lineage_names.pair – Optional station pair string (
"NET.STA.LOC-NET.STA.LOC"). Dots are kept; colons are replaced with-.components – Optional component string, e.g.
"ZZ".mov_stack – Optional moving-stack tuple or string, e.g.
("1D", "1D")→"m1D-1D".extra – Optional extra qualifier string appended last.
- Returns:
Resolved filename string, or outfile unchanged.
- msnoise.core.config.create_config_set(session, set_name, set_number_hint=None)
Create a configuration set for a workflow step.
- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobject, as obtained byconnect()set_name (str) – The name of the workflow step (e.g., ‘mwcs’, ‘mwcs_dtt’, etc.)
- Return type:
int or None
- Returns:
The set_number if set was created successfully, None otherwise
- msnoise.core.config.create_project_from_yaml(session, yaml_path)
Seed a freshly-initialised database from a project YAML file.
The project YAML uses
category_Nkeys so that multiple config sets of the same category are unambiguous. Each entry may declare anafterfield (string or list of strings) naming thecategory_Nsteps that must precede it; oneWorkflowLinkrow is created perafterentry. Steps whoseaftertarget is absent from the YAML produce a warning instead of an error, so partial YAMLs and incremental DB builds are both supported.Example:
msnoise_project_version: 1 global_1: startdate: "2013-04-01" enddate: "2014-10-31" preprocess_1: after: global_1 cc_sampling_rate: 20.0 cc_1: after: preprocess_1 cc_type_single_station_AC: PCC whitening: "N" filter_1: after: cc_1 # single parent freqmin: 1.0 freqmax: 2.0 AC: "Y" filter_2: after: cc_1 # fan-out from the same cc step freqmin: 0.5 freqmax: 1.0 AC: "Y" stack_1: after: [filter_1, filter_2] mov_stack: "(('2D','1D'))" refstack_1: after: [filter_1, filter_2] # sibling of stack, not child ref_begin: "2013-04-01" ref_end: "2014-10-31" mwcs_1: after: [stack_1, refstack_1] # list: requires both parents freqmin: 1.0 freqmax: 2.0
- Parameters:
session – SQLAlchemy session (from
connect()).yaml_path – Path to a project YAML file.
- Returns:
(created_steps, warnings)— list ofcategory_Nstrings created and list of warning strings for missingaftertargets.- Raises:
ValueError – if the YAML is missing
msnoise_project_versionor a key is not incategory_Nformat.
- msnoise.core.config.export_project_to_yaml(session, yaml_path, only_non_defaults=True)
Snapshot the current project DB state as a project YAML file.
The inverse of
create_project_from_yaml(). Reads all activeWorkflowSteprows, their config sets, theWorkflowLinktopology, allDataSourcerows, and allStationrows, and writes amsnoise_project_version: 1YAML that can be re-applied withmsnoise db init --from-yaml.- Parameters:
session – SQLAlchemy session.
yaml_path – Destination path for the YAML file.
only_non_defaults – When
True(default) only keys whose value differs from the CSV default are written, keeping the file minimal. Set toFalseto write every key explicitly.
- Returns:
Path string of the written file.
- msnoise.core.config.delete_config_set(session, set_name, set_number)
Delete a configuration set for a workflow step.
- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobject, as obtained byconnect()set_name (str) – The name of the workflow step (e.g., ‘mwcs’, ‘mwcs_dtt’, etc.)
set_number (int) – The set number to delete
- Return type:
- Returns:
True if set was deleted successfully, False otherwise
- msnoise.core.config.get_config(session, name=None, isbool=False, plugin=None, category='global', set_number=None)
Get the value of one or all config bits from the database.
- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobject, as obtained byconnect()name (str) – The name of the config bit to get. If omitted, a dictionary with all config items will be returned
isbool (bool) – if True, returns True/False for config name. Defaults to False
plugin (str) – if provided, gives the name of the Plugin config to use. E.g. if “Amazing” is provided, MSNoise will try to load the “AmazingConfig” entry point. See Extending MSNoise with Plugins for details.
- Return type:
- Returns:
the value for name or a dict of all config values
- msnoise.core.config.get_config_categories_definition()
Return ordered display metadata for all workflow categories.
Each entry is a
(category_key, display_name, level)tuple where level is the DAG depth (global=0, preprocess/psd=1 — both branch off global, …).Derived from
get_category_display_info()so plugin-added categories appear automatically. The list is ordered for UI rendering (CC branch first, PSD branch at end).
- msnoise.core.config.get_config_set_details(session, set_name, set_number, format='list')
Get details of a specific configuration set.
- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobjectset_name (str) – The category name
set_number (int) – The set number
- Return type:
list
- Returns:
List of config entries in the set
- msnoise.core.config.get_config_sets_organized(session)
Get configuration sets organized by category in the standard order
- msnoise.core.config.get_merged_params_for_lineage(db, orig_params, step_params, lineage)
Build a
MSNoiseParamsfor the given lineage.Returns
(lineage, lineage_names, MSNoiseParams).
- msnoise.core.config.get_params(session)
Build a single-layer
MSNoiseParamsfor global config.Queries the
Configtable directly for all(category='global', set_number=1)rows and casts each value to its declaredparam_type. Does not depend ondefault.py.For full pipeline params (per-step config included), use
get_merged_params_for_lineage()instead.- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobject, as obtained byconnect()- Return type:
MSNoiseParams
- msnoise.core.config.lineage_to_plot_tag(lineage_names)
Convert a list of step names to a compact plot filename tag.
Each step is abbreviated using
_STEP_ABBREVSand the set number is appended. Steps are joined with-.Examples:
lineage_to_plot_tag(["preprocess_1","cc_1","filter_1","stack_1"]) # → "pre1-cc1-f1-stk1" lineage_to_plot_tag(["preprocess_1","cc_1","filter_1","stack_1", "refstack_1","mwcs_1","mwcs_dtt_1","mwcs_dtt_dvv_1"]) # → "pre1-cc1-f1-stk1-ref1-mwcs1-dtt1-dvv1"
- Parameters:
lineage_names – List of step name strings from
MSNoiseResult.lineage_names.- Returns:
Hyphen-joined abbreviated tag string.
- msnoise.core.config.list_config_sets(session, set_name=None)
List all configuration sets, optionally filtered by category.
- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobjectset_name (str or None) – Optional category filter (e.g., ‘mwcs’, ‘mwcs_dtt’, etc.)
- Return type:
list
- Returns:
List of tuples (category, set_number, entry_count)
- msnoise.core.config.parse_config_key(key)
Parse a dot-notation config key into
(category, set_number, name).Accepted forms:
output_folder → ("global", 1, "output_folder") # global shorthand cc.cc_sampling_rate → ("cc", 1, "cc_sampling_rate") cc.2.cc_sampling_rate → ("cc", 2, "cc_sampling_rate") mwcs.2.mwcs_wlen → ("mwcs", 2, "mwcs_wlen")- Raises:
ValueError – for malformed keys (too many parts, non-integer set number, or bare name that is not a global parameter).
- msnoise.core.config.update_config(session, name, value, plugin=None, category='global', set_number=None)
Update one config bit in the database.
- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobject, as obtained byconnect()name (str) – The name of the config bit to set.
value (str) – The value of parameter name. Can also be NULL if you don’t want to use this particular parameter.
plugin (str) – if provided, gives the name of the Plugin config to use. E.g. if “Amazing” is provided, MSNoise will try to load the “AmazingConfig” entry point. See Extending MSNoise with Plugins for details.
category (str) – The config category (default ‘global’). Use e.g. ‘stack’, ‘filter’, ‘mwcs’ to target a specific config set.
set_number (int or None) – The config set number within the category (default None for global config). Use e.g. 1 for stack_1.
Workflow & Jobs
MSNoise workflow topology, job management, lineage resolution and scheduling.
- msnoise.core.workflow.build_movstack_datelist(session)
Creates a date array for the analyse period. The returned tuple contains a start and an end date, and a list of individual dates between the two.
- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobject, as obtained byconnect()- Return type:
tuple
- Returns:
(start, end, datelist)
- msnoise.core.workflow.build_ref_datelist(params, session=None)
Creates a date array for the REF. The returned tuple contains a start and an end date, and a list of individual dates between the two.
- Parameters:
params (
AttribDict) – A params object as obtained byget_params().session (
sqlalchemy.orm.session.Session, optional) – ASessionobject, as obtained byconnect(). Required only whenref_beginis"1970-01-01"(auto-detect from data availability).
- Return type:
tuple
- Returns:
(start, end, datelist)
- msnoise.core.workflow.compute_rolling_ref(data, ref_begin, ref_end)
Compute a per-index rolling reference from CCF data.
For each time index
i, the reference is:mean(data[i + ref_begin : i + ref_end])
Both
ref_beginandref_endmust be negative integers withref_begin < ref_end <= -1. Useref_end=-1to exclude the current window (compare to the previous N windows).Uses
min_periods=1semantics: the first few steps receive whatever partial window is available rather than NaN.- Return type:
numpy.ndarrayShape(n_times, n_lag_samples). Rowiis the reference for time stepi.
- msnoise.core.workflow.create_workflow_link(session, from_step_id, to_step_id, link_type='default')
Create a link between two workflow steps
- msnoise.core.workflow.create_workflow_links_from_steps(session)
Create workflow links automatically between existing workflow steps, following the DAG defined by
get_workflow_chains().Linking rule: every step in a source category is linked to every step in each of its successor categories (ALL×ALL). Set numbers carry no topological meaning — all combinations are created and the user removes unwanted links via the admin UI. The DAG is determined entirely by
WorkflowLinkrows.- Returns:
tuple: (created_count, existing_count, error_message)
- msnoise.core.workflow.create_workflow_step(session, step_name, category, set_number, description=None)
Create a new workflow step
- msnoise.core.workflow.create_workflow_steps_from_config_sets(session)
Create workflow steps automatically from all existing config sets, sorted by natural workflow order.
- Returns:
tuple: (created_count, existing_count, error_message)
- msnoise.core.workflow.extend_days(days)
Return a
DatetimeIndexfrom days extended by one extra day at the end.Replaces the pandas 1.x pattern:
idx = pd.to_datetime(days) idx = idx.append(pd.DatetimeIndex([idx[-1] + pd.Timedelta("1d")]))
which was removed in pandas 2.0.
- Parameters:
days – sequence of date-like values (strings, dates, datetimes…)
- Return type:
- msnoise.core.workflow.get_done_lineages_for_category(session, category)
Return all distinct computed lineages for a given workflow step category.
Queries
Jobrows whose associatedWorkflowStep.categorymatches category and whose flag is'D'(done), then de-duplicates and resolves eachlineagestring into an ordered list of step-name strings (upstream → downstream, including the step itself).This is the correct way to enumerate output folders for a step that may be reached via multiple upstream paths (e.g. multiple filters, multiple MWCS configs), because it reflects what was actually computed rather than what the DAG topology suggests.
Example:
lineages = get_done_lineages_for_category(db, 'stretching') # → [ # ['preprocess_1', 'cc_1', 'filter_1', 'stack_1', 'stretching_1'], # ['preprocess_1', 'cc_1', 'filter_2', 'stack_1', 'stretching_1'], # ]
- msnoise.core.workflow.get_filter_steps_for_cc_step(session, cc_step_id)
Get all filter steps that are children of a specific CC step.
- Parameters:
session – Database session
cc_step_id – The step_id of the CC step
- Returns:
List of filter workflow steps that are successors of the CC step
- msnoise.core.workflow.get_job_types(session, jobtype)
Return job counts grouped by flag for a given
jobtypestring.Works with the v2 workflow model where
jobtypeis a step name such as"cc_1". Returns a list of(count, flag)tuples.- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobjectjobtype (str) – Step name to query (e.g.
"cc_1")
- Return type:
- Returns:
List of (count, flag) pairs
- msnoise.core.workflow.get_lineages_to_step_id(session, step_id, include_self=True, max_depth=50, max_paths=5000)
Enumerate upstream lineages (all distinct paths) ending at step_id.
Returns a list of paths, each path being a list[WorkflowStep] ordered upstream -> downstream. This preserves branch structure (unlike get_upstream_steps_for_step_id which de-duplicates nodes).
- Safety:
max_depth prevents infinite loops in case of bad/cyclic graphs
max_paths prevents combinatorial explosion
- Parameters:
- sessionsqlalchemy.orm.session.Session
- step_idint
- include_selfbool
If True, each path ends with the step itself.
- max_depthint
- max_pathsint
- Returns:
- list[list[WorkflowStep]]
- msnoise.core.workflow.get_next_job_for_step(session, step_category='preprocess', flag='T', group_by='day', limit_days=None, chunk_size=0)
Return a claimed batch of jobs for a workflow step category.
Note
Most callers should use
get_next_lineage_batch()instead, which wraps this function and also resolves lineage strings, params, and station-pair metadata into a ready-to-use batch dict.- group_by:
“day”: claim all jobs for the selected (step_id, jobtype, day)
“pair”: claim all jobs for the selected (step_id, jobtype, pair)
“pair_lineage”: claim all jobs for the selected (step_id, jobtype, pair, lineage)
“day_lineage”: claim all jobs for the selected (step_id, jobtype, day, lineage)
- chunk_size:
Only applies to
group_by="day_lineage". When > 0, at most chunk_size jobs are claimed per batch instead of the full day. Useful for steps with O(N²) work per day (CC) or large per-day station counts (PSD) so that multiple workers can share the same day in parallel without write conflicts.chunk_size=0(default) claims everything — identical to the original behaviour.
- msnoise.core.workflow.get_next_lineage_batch(db, step_category, group_by='pair_lineage', loglevel='INFO', day_value=None, chunk_size=0)
Standard worker prolog for lineage-aware steps.
Claims the next batch of jobs for step_category using get_next_job_for_step.
Extracts (pair, lineage_str, refs, days).
Loads current step config (from the job row).
Resolves lineage_str -> WorkflowStep objects.
Builds a MSNoiseParams for that lineage (one layer per category).
- Parameters:
- chunk_sizeint, optional
Passed to
get_next_job_for_step(). Only effective forgroup_by="day_lineage". When > 0, at most chunk_size jobs are claimed per batch, enabling multiple workers to share the same day in parallel. Default0= claim everything (original behaviour).
- Returns:
- dict with keys:
jobs, step
pair, lineage_str, lineage_steps
lineage_names — full list including current step name
lineage_names_upstream — full minus current step (replaces manual [:-1])
- lineage_names_mov — upstream with any refstack_* entries stripped
(used by mwcs/wct/stretching to find MOV CCFs)
refs, days
step_params, params — params is a MSNoiseParams instance
- or None if no jobs were claimed (caller should continue/sleep).
- msnoise.core.workflow.get_refstack_lineage_for_filter(session, filterid, refstack_set_number=1)
Get the full lineage path from root through a filter step to its refstack.
Refstack is now a sibling of stack (both children of the filter pass-through). Returns the path ending at
refstack_Mdirectly downstream offilter_{filterid}— suitable forxr_get_ref().Example for the default single-pipeline:
get_refstack_lineage_for_filter(db, 1) # → ['preprocess_1', 'cc_1', 'filter_1', 'refstack_1']
- msnoise.core.workflow.get_t_axis(params)
Returns the time axis (in seconds) of the CC functions.
- Return type:
numpy.array- Returns:
the time axis in seconds
- msnoise.core.workflow.get_workflow_job_counts(db)
Get job counts by status for the workflow
- Parameters:
db – Database connection
- Returns:
Dictionary with job counts by status
- msnoise.core.workflow.get_workflow_links(session)
Get all links in a workflow
- msnoise.core.workflow.get_workflow_steps(session)
Get all steps in a workflow
- msnoise.core.workflow.is_next_job_for_step(session, step_category='preprocess', flag='T')
Are there any workflow-aware Jobs in the database with the specified flag and step category?
- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobject, as obtained byconnect()step_category (str) – Workflow step category (e.g., “preprocess”, “qc”)
flag (str) – Status of the Job: “T”odo, “I”n Progress, “D”one.
- Return type:
- Returns:
True if at least one Job matches the criteria, False otherwise.
- msnoise.core.workflow.lineage_str_to_step_names(lineage_str, sep='/')
Convert a lineage string like “preprocess_1/cc_1/filter_1” into a list of step_name strings, preserving order.
- msnoise.core.workflow.lineage_str_to_steps(session, lineage_str, sep='/', strict=True)
Resolve a lineage string to a list of WorkflowStep ORM objects (ordered).
- Parameters:
- sessionsqlalchemy.orm.session.Session
- lineage_strstr
e.g. “preprocess_1/cc_1/filter_1”
- sepstr
Separator used in lineage strings.
- strictbool
If True, raises if any step_name can’t be resolved. If False, silently skips missing steps.
- Returns:
- list[WorkflowStep]
- msnoise.core.workflow.massive_insert_job(session, jobs)
Bulk-insert a list of job dicts into the jobs table.
Each dict must contain at minimum
day,pair,jobtype,flagandlastmodkeys. Optional keys:step_id,priority,lineage.- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobjectjobs (list[dict]) – Job records to insert.
- msnoise.core.workflow.massive_update_job(session, jobs, flag='D')
Routine to use a low level function to update much faster a list of
Job. This method uses the Job.ref which is unique.Note: If session becomes invalid, will attempt to get a fresh session from the engine.
- Parameters:
session (Session) – the database connection object
jobs (list or tuple) – a list of
Jobto update.flag (str) – The destination flag.
- msnoise.core.workflow.propagate_downstream(session, batch: dict) int
Propagate a just-completed batch to all immediate downstream worker steps.
Called immediately after
massive_update_job()marks a batch Done, only whenparams.global_.hpcisFalse(the default). In HPC mode the operator runsmsnoise new_jobs --after <category>manually.Delegates to the specialised
propagate_*functions froms02_new_jobsfor transitions that require non-trivial logic (fan-outs, sentinel jobs, etc.). For simple pair-preserving transitions it performs a targeted bulk upsert keyed on the batch’s exact (pair, day) tuples.- Parameters:
- sessionsqlalchemy.orm.Session
- batchdict
Return value of
get_next_lineage_batch().
- Returns:
- int
Total jobs created or bumped to T.
- msnoise.core.workflow.refstack_is_rolling(params)
Return True if the refstack configset uses rolling-index mode.
Rolling mode is indicated by
ref_beginbeing a negative integer string (e.g."-5"). In this mode no REF file is written to disk; the reference is computed on-the-fly at MWCS/stretching/WCT time viacompute_rolling_ref().- Parameters:
params (
obspy.core.AttribDict) – Merged parameter set containingref_begin.- Return type:
- msnoise.core.workflow.refstack_needs_recompute(session, pair, cc_lineage_prefix, params)
Return True if the REF stack needs to be recomputed for this pair.
Queries Done
stackjobs for pair across all active stack_N steps whose lineage starts with cc_lineage_prefix (e.g.["preprocess_1", "cc_1", "filter_1"]), and checks whether any date falls inside the configured[ref_begin, ref_end]window.Stack and refstack are now siblings (both children of filter/cc), so the refstack worker no longer has a stack step in its upstream lineage. Instead, the cc/filter prefix is shared by both branches and is used here to locate the sibling stack jobs.
Should only be called for Mode A (fixed-date) refstack jobs.
- Parameters:
session – SQLAlchemy session.
pair – Station pair string
"NET.STA.LOC:NET.STA.LOC".cc_lineage_prefix – Lineage name list up to (but not including) the stack/refstack level, e.g.
["preprocess_1", "cc_1", "filter_1"]. Passbatch["lineage_names_upstream"]from the refstack worker — that list ends at filter_N (the pass-through above refstack).params –
MSNoiseParamsfor this lineage.
- Return type:
- msnoise.core.workflow.reset_jobs(session, jobtype, alljobs=False, reset_i=True, reset_e=True, downstream=False)
Reset jobs back to ``”T”``odo.
jobtype can be:
A specific step name, e.g.
"cc_1"— resets only that step.A category name, e.g.
"cc"— resets all active steps in that category (cc_1,cc_2, …).
If downstream is
True, all steps reachable viaWorkflowLinkfrom the resolved step(s) are also reset (breadth-first, respectsis_active).- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobjectjobtype (str) – Step name or category name to reset.
alljobs (bool) – If True reset all jobs regardless of current flag; otherwise only resets
"I"and/or"F"flagged jobs.reset_i (bool) – Reset ``”I”``n-progress jobs (default True).
reset_e (bool) – Reset ``”E”``rror/failed jobs (default True).
downstream (bool) – Also reset every step reachable downstream via active
WorkflowLinkrows (default False).
- msnoise.core.workflow.resolve_lineage_params(session, lineage_names)
Resolve a lineage name-list into a fully merged params object.
Given a list of step-name strings (as returned by
get_done_lineages_for_category()), resolves them toWorkflowStepORM objects and merges every step’s configuration into the global params, exactly as the processing steps themselves do viaget_next_lineage_batch().Returns
(lineage_steps, lineage_names, params)— the same tuple asget_merged_params_for_lineage()— so callers can useparamsdirectly (it will havecomponents_to_compute,mov_stack, etc.).Example:
lineage_names = get_done_lineages_for_category(db, 'mwcs_dtt')[0] _, _, params = resolve_lineage_params(db, lineage_names) mov_stack = params.stack.mov_stack[0]
- Parameters:
lineage_names – Ordered list of step-name strings, e.g.
['preprocess_1', 'cc_1', 'filter_1', 'stack_1', 'mwcs_1', 'mwcs_dtt_1'].- Return type:
tuple(list, list[str], MSNoiseParams)
- msnoise.core.workflow.update_job(session, day, pair, jobtype, flag, step_id=None, priority=0, lineage=None, commit=True, returnjob=True, ref=None)
Updates or Inserts a
Jobin the database. Workflow-aware: handlesstep_idandlineagefields.- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobject, as obtained byconnect()day (str) – The day in YYYY-MM-DD format
pair (str) – the name of the station pair
jobtype (str) – Job type string, e.g.
"preprocess_1"flag (str) – Status of the Job: “T”odo, “I”n Progress, “D”one, “F”ailed.
step_id (int or None) – WorkflowStep primary key, or None
priority (int) – Job priority (default 0)
lineage (str or None) – Lineage string encoding upstream configset chain
commit (bool) – Whether to directly commit (True, default) or not (False)
returnjob (bool) – Return the modified/inserted Job (True, default) or not (False)
ref (int or None) – If provided, look up the job by its primary key instead of (day, pair, jobtype, lineage).
- Return type:
Jobor None- Returns:
If returnjob is True, returns the modified/inserted Job.
- msnoise.core.workflow.filter_within_daterange(date, start_date, end_date)
Check if a date falls within the configured range
- msnoise.core.workflow.get_first_runnable_steps_per_branch(session, source_step_id, skip_categories=None)
Return the first runnable step on each outgoing branch from source_step_id, skipping steps in skip_categories (default: {“filter”}).
“Branch” roots are the immediate successors of source_step_id.
If skipped nodes fan out, each fan-out is treated as a separate branch.
Results are deduped (set behavior) and returned in stable order.
- msnoise.core.workflow.get_step_successors(session, step_id)
Get all steps that this step feeds into
- msnoise.core.workflow.get_workflow_graph(session)
Return workflow as nodes and edges for visualization, sorted by workflow order
- msnoise.core.workflow.get_workflow_chains()
Return the full workflow adjacency map, including any plugin extensions.
Merges the built-in topology with addenda registered via the
msnoise.plugins.workflow_chainsentry point group. Each registered callable must return a dict of the same schema as the built-in chains:{ "my_step": { "next_steps": ["other_step"], "is_entry_point": False, "is_terminal": False, } }
Plugin entries are merged in load order; later plugins can override earlier ones for the same category key.
- msnoise.core.workflow.get_workflow_order()
Return the canonical category display order, including plugin extensions.
Merges the built-in order with any extra categories registered via the
msnoise.plugins.workflow_orderentry point group. Each registered callable must return a list of category name strings in the desired insertion order (appended after the built-in list).- Return type:
list[str]
- msnoise.core.workflow.get_step_abbrevs()
Return a mapping of workflow category → short plot-filename abbreviation.
Merges the built-in abbreviations (from
_BUILTIN_WORKFLOW_CHAINS) with anyabbrevkeys provided by plugin entry points viamsnoise.plugins.workflow_chains. Falls back to the raw category name for any category without anabbreventry.
- msnoise.core.workflow.get_category_display_info()
Return ordered display metadata for all workflow categories.
Each entry is a dict with keys
category,display_name,level(DAG depth: global=0, preprocess/psd=1, cc=2, …). Ordered for UI rendering: the main CC branch first (preprocess→cc→…→dvv), then the PSD branch (psd→psd_rms). Plugin categories are appended at the end.- Return type:
list[dict]
- msnoise.core.workflow.is_terminal_category(category)
Return
Trueif category is a terminal workflow step (no successors).Terminal categories are those with an empty
next_stepslist inget_workflow_chains(). Examples:psd_rms,mwcs_dtt_dvv.- Parameters:
category – Workflow category string.
- Return type:
- msnoise.core.workflow.is_entry_category(category)
Return
Trueif category is a DAG entry point (no predecessors).Entry categories are those with
is_entry_point: Trueinget_workflow_chains(). Currently onlyglobal.- Parameters:
category – Workflow category string.
- Return type:
- msnoise.core.workflow.query_active_steps(session, categories)
Return all active WorkflowStep rows whose category is in categories.
- Parameters:
categories – A single category string or an iterable of strings.
- Return type:
list[WorkflowStep]
- msnoise.core.workflow.resolve_lineage_ids(session, lids)
Batch-resolve a set of lineage_ids to their lineage strings.
- msnoise.core.workflow.bulk_upsert_jobs(session, to_insert, to_bump_refs, now, *, bump_flag_filter=None)
Bulk-insert new jobs and bump existing ones back to
"T".- Parameters:
to_insert – List of job dicts (keys: day, pair, jobtype, step_id, priority, flag, lastmod, lineage_id).
to_bump_refs – List of
Job.refprimary-key values to reset to T.now –
datetimeused forlastmodon bumped rows.bump_flag_filter – If given, only rows whose current flag equals this value are bumped. Pass
Noneto bump unconditionally.
- Returns:
(n_inserted, n_bumped)- Return type:
- msnoise.core.workflow.get_category_cli_command(category: str) list | None
Return the
msnoisesub-command token list for category, or None.Reads the
"cli"key fromget_workflow_chains()so that the mapping stays in the single source of truth.Nonemeans the category is a pass-through (no worker, no jobs).Plugin packages extend the mapping by adding a
"cli"key to the dict returned by theirmsnoise.plugins.workflow_chainsentry point — no separate entry point is needed.- Parameters:
category – Workflow category string (e.g.
"cc","stack").- Return type:
list[str] or None
- msnoise.core.workflow.run_workflow_plan(session, *, threads: int = 1, hpc: bool | None = None, from_category: str | None = None, until_category: str | None = None, chunk_size: int = 0) list
Build an ordered execution plan for all active workflow steps.
Performs a topological sort of active
WorkflowStepcategories viaWorkflowLinkrows and returns an ordered list ofRunStepnamedtuples. Pass-through categories (filter,global) are omitted. Categories with no runnable command (seeget_category_cli_command()) are also omitted.The returned plan is pure data — no subprocesses are started. The CLI command
msnoise utils run_workflowdrives the actual execution.- Parameters:
session – SQLAlchemy session.
threads – Number of parallel worker threads/processes (
-t N).hpc – Override
hpcflag from config. IfNone, reads fromparams.global_.hpc.from_category – Skip all categories before this one (inclusive start).
until_category – Stop after this category (inclusive end).
chunk_size –
--chunk-sizevalue passed tocc computeandqc compute_psd.
- Returns:
Ordered list of
RunStepnamedtuples.- Return type:
list[RunStep]
I/O
MSNoise xarray I/O for all result types (CCF, MWCS, DTT, STR, WCT, DVV, PSD).
- msnoise.core.io.aggregate_dvv_pairs(root, parent_lineage, parent_step_name, parent_category: str, mov_stack, component: str, pair_type: str, pairs, params) Dataset
Aggregate per-pair DTT/STR/WCT-DTT results into network-level dv/v statistics.
Reads all per-pair output files for the given
(mov_stack, component)combination, extracts the appropriate dv/v and error columns per pair type, applies quality filtering, then computes across-pair statistics at each time step.- Parameters:
root – Output folder root.
parent_lineage – Lineage name list up to and including the parent DTT step (e.g.
["preprocess_1", ..., "mwcs_dtt_1"]).parent_step_name – Step name of the parent DTT step.
parent_category –
"mwcs_dtt","stretching", or"wavelet_dtt".mov_stack – Tuple
(window, step)e.g.("1D", "1D").component – Component string e.g.
"ZZ".pair_type –
"CC","SC","AC", or"ALL".pairs – Iterable of
(sta1, sta2)SEED-id string tuples.params – Params object from
get_params().
- Returns:
xarray.Datasetwith dimtimesand stat variables.- Raises:
ValueError – if no data files are found for the given combination.
- msnoise.core.io.psd_ppsd_to_dataframe(ppsd)
- msnoise.core.io.psd_ppsd_to_dataset(ppsd)
Convert an ObsPy PPSD object to an
xarray.Dataset.Builds the same data as
psd_ppsd_to_dataframe()but returns anxr.Datasetwith aPSDvariable of dims(times, periods)— ready to pass directly toxr_save_psd()without a DataFrame round-trip.- Parameters:
ppsd –
PPSDobject.- Returns:
xarray.Dataset.
- msnoise.core.io.psd_read_results(net, sta, loc, chan, datelist, format='PPSD', use_cache=True)
- msnoise.core.io.save_daily_ccf(root, lineage, step_name, station1, station2, components, date, corr, taxis)
Save a daily-stacked CCF to NetCDF using
xr_save_ccf_daily().Replaces the legacy
add_corr()/_export_mseed/_export_sacpipeline. Output lives at:<root>/<lineage>/<step_name>/_output/daily/<components>/<sta1>_<sta2>/<date>.nc
- Parameters:
root – Output folder (
params.global_.output_folder).lineage – List of step-name strings for the lineage path.
step_name – Current step name (e.g.
"cc_1").station1 – First station SEED id
NET.STA.LOC.station2 – Second station SEED id
NET.STA.LOC.components – Component pair string e.g.
"ZZ".date – Calendar date (
datetime.dateor ISO string).corr – 1-D numpy array, the stacked CCF.
taxis – 1-D lag-time axis array.
- msnoise.core.io.xr_get_ccf(root, lineage, station1, station2, components, mov_stack, taxis)
Load CCF results from a NetCDF file.
- Returns:
xarray.DataArraywith dims(times, taxis), lazily memory-mapped. Call.load()or.valueswhen you need numpy.- Raises:
FileNotFoundError – if the NetCDF file does not exist.
Note
xr_get_ccf(readers: s05/s08/s10) andxr_save_ccf(writer: s04) operate on strictly disjoint workflow steps — the file is never read and written concurrently, so keeping the handle lazy is safe.
- msnoise.core.io.xr_get_ccf_all(root, lineage, step_name, station1, station2, components, date)
Load all windowed CCFs for one day written by
xr_save_ccf_all().- Returns:
xarray.DataArraywith dims(times, taxis).- Raises:
FileNotFoundError – if the NetCDF file does not exist.
- msnoise.core.io.xr_get_ccf_daily(root, lineage, step_name, station1, station2, components, date)
Load a single daily-stacked CCF written by
xr_save_ccf_daily().- Returns:
xarray.DataArraywith dimtaxis.- Raises:
FileNotFoundError – if the NetCDF file does not exist.
- msnoise.core.io.xr_get_dtt(root, lineage, station1, station2, components, mov_stack)
Load DTT results from a NetCDF file.
- Returns:
xarray.Datasetwith aDTTvariable and dims(times, keys).- Raises:
FileNotFoundError – if the NetCDF file does not exist.
- msnoise.core.io.xr_get_dvv_agg(root, lineage, step_name, mov_stack, pair_type: str, component: str)
Load a DVV aggregate result from a NetCDF file.
- Returns:
xarray.Dataset(lazy).- Raises:
FileNotFoundError – if the file does not exist.
- msnoise.core.io.xr_get_dvv_pairs(root, lineage, step_name, mov_stack, pair_type: str, component: str)
Load per-pair dv/v time series from a NetCDF file.
- Returns:
xarray.Datasetwith dims(pair, times)and variablesdvvanderr.- Raises:
FileNotFoundError – if the file does not exist.
- msnoise.core.io.xr_get_mwcs(root, lineage, station1, station2, components, mov_stack)
Load MWCS results from a NetCDF file.
- Returns:
xarray.Datasetwith aMWCSvariable and dims(times, taxis, keys).- Raises:
FileNotFoundError – if the NetCDF file does not exist.
- msnoise.core.io.xr_get_ref(root, lineage, station1, station2, components, taxis, ignore_network=False)
- msnoise.core.io.xr_get_wct_dtt(root, lineage, station1, station2, components, mov_stack)
Load per-pair WCT dt/t results from a NetCDF file.
Returns an
xarray.Datasetwith variablesDTT,ERR,COHeach of shape(times, frequency), as written byxr_save_wct_dtt().- Raises:
FileNotFoundError – if the NetCDF file does not exist.
- Return type:
xarray.Dataset
- msnoise.core.io.xr_load_ccf_for_stack(root, lineage_names, station1, station2, components, dates)
Load per-day CCF data for stacking — the higher-level replacement for
get_results_all.Reads the daily CCF NetCDF files written by
s03_compute_no_rotation()for the requested station pair, component and date list. Trieskeep_all(per-window,_output/all/) first; falls back tokeep_days(daily stacks,_output/daily/) if the former are absent.Path layout (written by s03):
<root> / *cc_lineage / filter_step / _output / all|daily / <comp> / <sta1>_<sta2> / <date>.nc
Data are stored internally as float32 to halve the working-set size relative to the float64 values written by s03. Precision loss is negligible for the rolling-mean operations performed by the stack worker.
- Parameters:
root – Output folder (
params.global_.output_folder).lineage_names – Full lineage name list including the filter step, e.g.
["preprocess_1", "cc_1", "filter_1", "stack_1"].station1 – First station SEED id
NET.STA.LOC.station2 – Second station SEED id
NET.STA.LOC.components – Component pair string e.g.
"ZZ".dates – Iterable of
datetime.dateor ISO date strings.
- Returns:
xarray.Datasetwith variableCCFand dims(times, taxis), sorted bytimes. Empty Dataset if no data found.
- msnoise.core.io.xr_load_psd(root, lineage, step_name, seed_id, day)
Load a daily PSD NetCDF written by
xr_save_psd().- Parameters:
- formatstr
"dataframe"returns aDataFrameorNoneif file not found."dataset"(default) returns anxarray.DatasetorNone.
- msnoise.core.io.xr_load_rms(root, lineage, step_name, seed_id)
Load per-station PSD RMS results from a NetCDF file.
- Parameters:
- formatstr
"dataframe"returns aDataFrameorNoneif file not found."dataset"(default) returns anxarray.DatasetorNone.
- msnoise.core.io.xr_load_wct(root, lineage, station1, station2, components, mov_stack)
Load WCT results from a NetCDF file.
- Returns:
xarray.Datasetwith variablesWXamp,Wcoh,WXdtand dims(times, freqs, taxis).- Raises:
FileNotFoundError – if the NetCDF file does not exist.
Pure read —
xr_save_wct_dtt()writes to a different path, so keeping this handle lazy is safe.
- msnoise.core.io.xr_save_ccf(root, lineage, step_name, station1, station2, components, mov_stack, taxis, new, overwrite=False)
- msnoise.core.io.xr_save_ccf_all(root, lineage, step_name, station1, station2, components, date, window_times, taxis, corrs)
Save all windowed CCFs for one day as a single NetCDF file.
Replaces the legacy HDF5-based
export_allcorr. One file per calendar day — safe for concurrent workers since each worker owns exactly one day.Path layout:
<root>/<lineage>/<step_name>/_output/all/<components>/<sta1>_<sta2>/<date>.nc
- Parameters:
window_times – 1-D array of window start times (datetime strings or datetime objects) — the
timesdimension.taxis – 1-D lag-time axis array — the
taxisdimension.corrs – 2-D numpy array
(n_windows, n_taxis)of CCF data.
- msnoise.core.io.xr_save_ccf_daily(root, lineage, step_name, station1, station2, components, date, taxis, corr)
Save a single daily-stacked CCF as a per-day NetCDF file.
One file per calendar day — safe for concurrent workers since each worker owns exactly one day.
Path layout:
<root>/<lineage>/<step_name>/_output/daily/<components>/<sta1>_<sta2>/<YYYY-MM-DD>.nc
- Parameters:
corr – 1-D numpy array of the stacked CCF.
taxis – 1-D lag-time axis array.
date –
datetime.dateor ISO string"YYYY-MM-DD".
- msnoise.core.io.xr_save_dtt(root, lineage, step_name, station1, station2, components, mov_stack, dataset)
Save DTT results to a NetCDF file.
- Parameters:
dataset –
xarray.Datasetwith aDTTvariable and dims(times, keys), as built bys06_compute_mwcs_dtt.
- msnoise.core.io.xr_save_dvv_agg(root, lineage, step_name, mov_stack, pair_type: str, component: str, dataset)
Save a DVV aggregate result to a NetCDF file.
Path layout:
<root>/<lineage>/<step_name>/_output/<mov_stack>/dvv_<pair_type>_<component>.nc
- Parameters:
dataset –
xarray.Datasetwith dimtimesand stat variables as built byaggregate_dvv_pairs().
- msnoise.core.io.xr_save_dvv_pairs(root, lineage, step_name, mov_stack, pair_type: str, component: str, dataset)
Save per-pair dv/v time series to a NetCDF file.
Path layout:
<root>/<lineage>/<step_name>/_output/<mov_stack>/dvv_pairs_<pair_type>_<component>.nc
- Parameters:
dataset –
xarray.Datasetwith dims(pair, times)and variablesdvvanderr, as built byaggregate_dvv_pairs().
- msnoise.core.io.xr_save_mwcs(root, lineage, step_name, station1, station2, components, mov_stack, taxis, dataset)
Save MWCS results to a NetCDF file.
- Parameters:
dataset –
xarray.Datasetwith aMWCSvariable and dims(times, taxis, keys), as built bys05_compute_mwcs.
- msnoise.core.io.xr_save_psd(root, lineage, step_name, seed_id, day, dataset)
Save a daily PSD result to a NetCDF file.
Path layout:
<root>/<lineage>/<step_name>/_output/daily/<seed_id>/<YYYY-MM-DD>.nc
- Parameters:
dataset –
xarray.Datasetwith aPSDvariable and dims(times, periods), as built bypsd_ppsd_to_dataset().
- msnoise.core.io.xr_save_ref(root, lineage, step_name, station1, station2, components, taxis, new, overwrite=False)
- msnoise.core.io.xr_save_rms(root, lineage, step_name, seed_id, dataframe)
Save per-station PSD RMS results to a NetCDF file.
Accepts either a
DataFrame(legacy, index = DatetimeIndex, columns = band labels) or anxarray.Datasetwith aRMSvariable and dims(times, bands).
- msnoise.core.io.xr_save_stretching(root, lineage, step_name, station1, station2, components, mov_stack, dataset)
Save per-pair stretching results to a NetCDF file.
Path layout:
<root>/<lineage>/<step_name>/_output/<mov_stack[0]>_<mov_stack[1]>/<components>/<sta1>_<sta2>.nc
- Parameters:
dataset –
xarray.Datasetwith aSTRvariable and dims(times, keys)where keys =['Delta', 'Coeff', 'Error'], as built bys10_stretching.
- msnoise.core.io.xr_save_wct(root, lineage, step_name, station1, station2, components, mov_stack, dataset)
Save WCT results to a NetCDF file.
- Parameters:
dataset –
xarray.Datasetwith variablesWXamp,Wcoh,WXdtand dims(times, freqs, taxis), as built bys08_compute_wct.
- msnoise.core.io.xr_save_wct_dtt(root, lineage, step_name, station1, station2, components, mov_stack, taxis, dataset)
Save WCT-DTT results to a NetCDF file.
- Parameters:
dataset –
xarray.Datasetwith variablesDTT,ERR,COHand dims(times, frequency), as built bys09_compute_wct_dtt.
Signal Processing
MSNoise signal processing, preprocessing helpers, and stacking utilities.
- msnoise.core.signal.check_and_phase_shift(trace, taper_length=20.0)
- msnoise.core.signal.compute_wct_dtt(freqs, tvec, WXamp, Wcoh, delta_t, lag_min=5, coda_cycles=20, mincoh=0.5, maxdt=0.2, min_nonzero=0.25, freqmin=0.1, freqmax=2.0)
Compute dv/v and associated errors from WCT results following Mao et al.[1].
For each frequency band, fits a weighted linear regression
delta_t(t) = -(dv/v) * tusing log-amplitude weights from the cross-wavelet spectrum (their eq. 3–4).- Parameters:
freqs – Frequency values from the WCT.
tvec – Time axis.
WXamp – Cross-wavelet amplitude array (freqs × taxis).
Wcoh – Wavelet coherence array (freqs × taxis).
delta_t – Time delay array (freqs × taxis).
lag_min – Minimum coda lag in seconds.
coda_cycles – Number of periods to use as coda window width.
mincoh – Minimum coherence threshold.
maxdt – Maximum allowed time delay.
min_nonzero – Minimum fraction of valid (non-zero weight) samples required.
freqmin – Lower frequency bound for regression.
freqmax – Upper frequency bound for regression.
- Returns:
Tuple of (dt/t, err, weighting_function).
- msnoise.core.signal.find_segments(data, gap_threshold)
Identify continuous non-NaN segments in an xarray DataArray.
- Parameters:
data – 2-D xarray DataArray (times × lags).
gap_threshold – Maximum index gap before treating as a new segment.
- Returns:
List of lists of row indices forming each continuous segment.
- msnoise.core.signal.getCoherence(dcs, ds1, ds2)
Compute cross-coherence between two spectra.
- Parameters:
dcs – Cross-spectrum magnitudes (1-D array, length n).
ds1 – Auto-spectrum of signal 1 (1-D array, length n).
ds2 – Auto-spectrum of signal 2 (1-D array, length n).
- Returns:
Complex coherence array of length n, clipped to
|coh| <= 1.
- msnoise.core.signal.getGaps(stream, min_gap=None, max_gap=None)
- msnoise.core.signal.get_preprocessed_stream(output_dir, step_name, goal_day, stations)
Read per-station preprocessed files and return a merged Stream.
Counterpart to
save_preprocessed_streams(). Reads only the station files needed for stations (a list of"NET.STA.LOC"strings) and returns them merged into a singleStream.Missing station files are silently skipped (the cc worker checks for empty streams downstream).
- Parameters:
output_dir – Base output directory.
step_name – Workflow step name (e.g.
"preprocess_1").goal_day – Processing date string (
YYYY-MM-DD).stations – List of
"NET.STA.LOC"strings to load.
- Returns:
- msnoise.core.signal.get_wavelet_type(wavelet_type)
Return an internal wavelet object for the given type/parameter pair.
- Parameters:
wavelet_type – Tuple
(name, param)or(name,). Supported names:"Morlet","Paul","DOG","MexicanHat".- Returns:
Corresponding wavelet instance (_Morlet, _Paul, _DOG, _MexicanHat).
- msnoise.core.signal.get_wct_avgcoh(freqs, tvec, wcoh, freqmin, freqmax, lag_min=5, coda_cycles=20)
Calculate average wavelet coherence over a frequency range and coda window.
- Parameters:
freqs – Frequency array.
tvec – Time axis.
wcoh – Wavelet coherence array (freqs × taxis).
freqmin – Lower frequency bound.
freqmax – Upper frequency bound.
lag_min – Minimum coda lag in seconds.
coda_cycles – Number of periods to use as coda window width.
- Returns:
Average coherence per frequency bin within [freqmin, freqmax].
- msnoise.core.signal.get_window(window='boxcar', half_win=3)
Return a normalised complex smoothing window for MWCS processing.
- Parameters:
window –
"boxcar"(default) or"hanning".half_win – Half-width in samples (full window =
2*half_win+1).
- Returns:
Complex numpy array of length
2*half_win+1, sum-normalised.
- msnoise.core.signal.make_same_length(st)
This function takes a stream of equal sampling rate and makes sure that all channels have the same length and the same gaps.
- msnoise.core.signal.nextpow2(x)
Returns the next power of 2 of x.
- msnoise.core.signal.preload_instrument_responses(session, return_format='dataframe')
This function preloads all instrument responses from
response_pathand stores the seed ids, start and end dates, and paz for every channel in a DataFrame. Any file readable by obspy’s read_inventory will be processed.- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobject, as obtained byconnect()return_format (str) – The format of the returned object, either
dataframeorinventory.
- Return type:
- Returns:
A table containing all channels with the time of operation and poles and zeros (DataFrame), or an obspy Inventory object.
- msnoise.core.signal.prepare_abs_positive_fft(line, sampling_rate)
Return the positive-frequency part of the absolute FFT of line.
- Parameters:
line – 1-D signal array.
sampling_rate – Sampling rate in Hz.
- Returns:
(freq, val)- positive-frequency vector and absolute FFT values.
- msnoise.core.signal.psd_df_rms(d, freqs, output='VEL')
Compute per-frequency-band RMS from PSD data.
- Parameters:
d –
xarray.Datasetwith aPSDvariable and dims(times, periods), as returned byxr_load_psd().freqs – List of
(fmin, fmax)tuples defining frequency bands.output – Physical unit -
"VEL"(default),"ACC", or"DISP".
- Returns:
xarray.Datasetwith oneRMSvariable and dims(times, bands), ready forxr_save_rms().
- msnoise.core.signal.psd_rms(s, f)
Compute RMS from a power spectrum array and frequency array.
- Parameters:
s – Power spectral density values (1-D array).
f – Frequency values (1-D array, same length as s).
- Returns:
Float - square-root of the integrated power.
- msnoise.core.signal.save_preprocessed_streams(stream, output_dir, step_name, goal_day)
Write preprocessed traces to per-station files.
Output layout:
<output_dir>/<step_name>/_output/<goal_day>/<NET.STA.LOC>.mseed
One file per station (all channels for that station in the same file). This is concurrency-safe: each station writes its own file with no shared state between parallel workers.
- Parameters:
stream –
Streamto write.output_dir – Base output directory (
params.global_.output_folder).step_name – Workflow step name (e.g.
"preprocess_1").goal_day – Processing date string (
YYYY-MM-DD).
- Returns:
List of written file paths (one per station).
- msnoise.core.signal.smoothCFS(cfs, scales, dt, ns, nt)
Smooth CWT coefficients along both time and scale axes.
- Parameters:
cfs – CWT coefficient array, shape
(n_scales, n_times).scales – 1-D array of wavelet scales.
dt – Sampling interval in seconds.
ns – Length of the moving-average filter across scales.
nt – Gaussian width parameter along time.
- Returns:
Smoothed coefficient array, same shape as cfs.
- msnoise.core.signal.stack(data, stack_method='linear', pws_timegate=10.0, pws_power=2, goal_sampling_rate=20.0, freqmin=1.0, freqmax=10.0, tfpws_nscales=20)
Stack an array of CCF traces into a single representative trace.
Three methods are available, selected via stack_method:
- Linear stack (
"linear") The arithmetic mean across all input traces:
\[s(t) = \frac{1}{N} \sum_{j=1}^{N} d_j(t)\]Incoherent noise cancels as \(1/\sqrt{N}\). Fastest and most transparent, but offers no protection against high-amplitude transients (earthquakes, instrumental glitches) that survive pre-processing.
- Phase-weighted stack (
"pws") Introduced by Schimmel & Paulssen (1997) [2]. Each sample is weighted by the instantaneous phase coherence \(c(t)\) of the analytic signal across all traces:
\[c(t) = \frac{1}{N} \left| \sum_{j=1}^{N} e^{i\,\phi_j(t)} \right|, \qquad c(t) \in [0, 1]\]where \(\phi_j(t) = \arg\bigl(d_j(t) + i\,\mathcal{H}\{d_j\}(t)\bigr)\) is the instantaneous phase of trace j obtained via the Hilbert transform \(\mathcal{H}\). The coherence is smoothed with a boxcar window of pws_timegate seconds before being raised to the power v = pws_power:
\[s(t) = \frac{1}{N} \sum_{j=1}^{N} d_j(t) \cdot c(t)^v\]High-amplitude incoherent transients have random phases across traces, so \(c(t) \approx 0\) at those times; coherent arrivals have \(c(t) \approx 1\) and are preserved.
- Time-frequency phase-weighted stack (
"tfpws") The TF extension of PWS by Schimmel & Gallart (2007) [3]. Phase coherence is computed in the time-frequency domain via a continuous wavelet transform (CWT) with a complex Morlet wavelet, giving a coherence map \(c(a, t)\) that is both scale- and time-dependent. Averaging over the nscales log-spaced scales spanning [freqmin, freqmax] Hz yields a single per-lag weight:
\[W_j(a, t) = \mathcal{W}\{d_j\}(a, t)\]\[c(a, t) = \frac{1}{N} \left| \sum_{j=1}^{N} e^{i\,\arg W_j(a,t)} \right|\]\[w(t) = \left[ \frac{1}{A} \sum_{a} c(a, t) \right]^v, \qquad s(t) = \frac{1}{N} \sum_{j=1}^{N} d_j(t) \cdot w(t)\]where \(A\) is the number of scales. Because coherence is evaluated independently at each scale, tf-PWS is more sensitive to narrow-band coherent arrivals than time-domain PWS. It is particularly effective for noise autocorrelations where body-wave reflections occupy a limited frequency band (Romero & Schimmel 2018 [4]).
Note
Memory scales as \(O(N \times A \times T)\) complex128. For long CCFs or large archives consider reducing nscales (default 20) or chunking pairs outside this function.
- Parameters:
data (
numpy.ndarray) – 2-D array of shape(N_traces, N_lags), each row one CCF.stack_method (str) –
"linear","pws", or"tfpws".pws_timegate (float) – Boxcar smoothing window for the PWS coherence estimate, in seconds (
"pws"only). Default 10 s.pws_power (float) – Exponent v applied to the coherence weight. Larger values increase selectivity. Shared by
"pws"and"tfpws". Default 2.goal_sampling_rate (float) – Sampling rate of the CCF array (Hz).
freqmin (float) – Lower frequency bound (Hz) for the CWT scale range —
"tfpws"only. Should match the parent filter’s freqmin.freqmax (float) – Upper frequency bound (Hz) for the CWT scale range —
"tfpws"only. Should match the parent filter’s freqmax.tfpws_nscales (int) – Number of log-spaced CWT scales between freqmin and freqmax —
"tfpws"only. Default 20.
- Return type:
- Returns:
1-D stacked CCF of length
N_lags, or[]if no valid (non-NaN) traces are present.
- Linear stack (
- msnoise.core.signal.validate_stack_data(dataset, stack_type='reference')
Validates stack data before processing
- Parameters:
dataset: xarray Dataset to validate stack_type: Type of stack (“reference” or “moving”) for error messages
- Returns:
(is_valid, message) tuple
- msnoise.core.signal.wiener_filt(data, M, N, gap_threshold)
Apply a 2-D Wiener filter to the CCF dataset, segment by segment.
Operates only on continuous (non-NaN) segments of the time axis to avoid smearing across data gaps.
- Parameters:
data – xarray Dataset containing a
CCFvariable (times × taxis).M – Wiener filter window size along the time axis.
N – Wiener filter window size along the lag axis.
gap_threshold – Passed to
find_segments().
- Returns:
Copy of data with the filtered
CCFvariable.
- msnoise.core.signal.winsorizing(data, params, input='timeseries', nfft=0)
Clip (Winsorise) a 2-D data array in the time or frequency domain.
Supports both one-shot sign-clipping (
winsorizing == -1) and RMS-based clipping (winsorizing > 0). When input is"fft"the array is temporarily transformed back to the time domain, clipped, then re-transformed.- Parameters:
data – 1-D or 2-D array of shape
(n_traces, n_samples).params – MSNoise params object; must expose
params.cc.winsorizing.input –
"timeseries"(default) or"fft".nfft – FFT length used when input is
"fft"; ignored otherwise.
- Returns:
Clipped array (same shape as input).
- msnoise.core.signal.xwt(trace_ref, trace_current, fs, ns=3, nt=0.25, vpo=12, freqmin=0.1, freqmax=8.0, nptsfreq=100, wavelet_type=('Morlet', 6.0))
Wavelet Coherence Transform (WCT) between two time series (Grinsted et al.[5]; traveltime estimation: Mao et al.[1]).
Convenience wrapper around
prepare_ref_wct()+apply_wct(). Use those two functions directly in hot loops (Mode A fixed-REF) to avoid recomputing the reference CWT on every call.- Parameters:
trace_ref – Reference signal (1-D array).
trace_current – Current signal (1-D array, same length).
fs – Sampling frequency in Hz.
ns – Scale-axis smoothing parameter.
nt – Time-axis smoothing parameter.
vpo – Voices-per-octave; higher = finer scale resolution.
freqmin – Lowest frequency of interest (Hz).
freqmax – Highest frequency of interest (Hz).
nptsfreq – Number of frequency points between freqmin and freqmax.
wavelet_type –
(name, param)tuple passed toget_wavelet_type().
- Returns:
(WXamp, WXspec, WXangle, Wcoh, WXdt, freqs, coi)
Computation (CC / MWCS / WCT)
MSNoise core computation functions — cross-correlation, whitening, MWCS.
Moved from msnoise/move2obspy.py.
- msnoise.core.compute.mwcs(current, reference, freqmin, freqmax, df, tmin, window_length, step, smoothing_half_win=5)
Moving Window Cross-Spectrum (MWCS) time-delay measurement.
Both time series are sliced into overlapping windows. Each window is mean-adjusted and cosine-tapered (85%) before being Fourier-transformed.
The cross-spectrum between the reference and current windows is:
\[X(\nu) = F_\text{ref}(\nu)\, F_\text{cur}^*(\nu)\]where \({}^*\) denotes complex conjugation. \(X(\nu)\) is smoothed by convolution with a Hanning window. Cross-coherency is then:
\[C(\nu) = \frac{ \left| \overline{X(\nu)} \right| }{ \sqrt{ \overline{\left|F_\text{ref}(\nu)\right|^2}\; \overline{\left|F_\text{cur}(\nu)\right|^2} } }\]where the over-bar denotes smoothing. The mean coherence per window is the mean of \(C(\nu)\) over the frequency band.
The unwrapped phase of \(X(\nu)\) is linearly proportional to frequency:
\[\phi_j = 2\pi\,\delta t\,\nu_j\]so the time delay \(\delta t\) is the slope of a weighted linear regression over the frequency band (Clarke et al.[6], extending Poupinet et al.[7]). Weights incorporate both cross-spectral amplitude and coherence. The slope error is:
\[e_m = \sqrt{ \sum_j \left( \frac{w_j\,\nu_j}{\sum_i w_i\,\nu_i^2} \right)^2 \sigma_\phi^2 }, \qquad \sigma_\phi^2 = \frac{\sum_j (\phi_j - m\,\nu_j)^2}{N-1}\]where \(w_j\) are the per-sample weights and \(\nu_j\) are the cross-coherences.
Returns one row per moving window: central lag time, \(\delta t\), error, and mean coherence.
Warning
The time series will not be filtered before computing the cross-spectrum! They should be band-pass filtered around the freqmin-freqmax band of interest beforehand.
- Parameters:
current (
numpy.ndarray) – The “Current” timeseriesreference (
numpy.ndarray) – The “Reference” timeseriesfreqmin (float) – The lower frequency bound to compute the dephasing (in Hz)
freqmax (float) – The higher frequency bound to compute the dephasing (in Hz)
df (float) – The sampling rate of the input timeseries (in Hz)
tmin (float) – The leftmost time lag (used to compute the “time lags array”)
window_length (float) – The moving window length (in seconds)
step (float) – The step to jump for the moving window (in seconds)
smoothing_half_win (int) – If different from 0, defines the half length of the smoothing hanning window.
- Return type:
- Returns:
[time_axis,delta_t,delta_err,delta_mcoh]. time_axis contains the central times of the windows. The three other columns contain dt, error and mean coherence for each window.
- msnoise.core.compute.myCorr2(data, maxlag, energy, index, plot=False, nfft=None, normalized=False)
Compute cross-correlations for all requested station pairs.
Tiled-batch implementation: processes pairs in chunks of
_MYCORR2_CHUNKto amortise Python loop overhead while keeping peak memory bounded. Faster than the original per-pair loop for N > ~50 stations (~2-3x for N=200-500); falls back gracefully to near-loop speed for small N.- Parameters:
data (
numpy.ndarray) – 2-D array(n_stations, nfft)containing the FFT of each pre-whitened time series.maxlag (int) – Output half-length in samples; CCF returned over
[-maxlag : maxlag](length2*maxlag + 1).energy (
numpy.ndarray) – Per-station RMS energy(n_stations,)used for POW normalisation.index (list) – List of
(ccf_id, sta1_idx, sta2_idx)tuples.normalized (str or bool) –
"POW","MAX","ABSMAX", or falsy for none.
- Return type:
- Returns:
{ccf_id: ccf_array}for every pair in index.
- msnoise.core.compute.pcc_xcorr(data, maxlag, energy, index, plot=False, nfft=None, normalized=False)
Phase Cross-Correlation v=2 (PCC2) — pure NumPy/SciPy implementation.
Replaces the former dependency on the unmaintained
phasecorrpackage with a self-contained translation of the FFT-acceleratedpcc2_setroutine from FastPCC (Ventosa et al.[8]; Ventosa and Schimmel[9]).Algorithm (matches FastPCC
pcc2_set):Compute the amplitude-normalised analytic signal (phase signal)
φ[n] = X_a[n] / |X_a[n]|for each trace — amplitude information is discarded entirely, so the result is insensitive to amplitude transients (earthquakes, glitches) without explicit temporal normalisation.Zero-pad to
Nz = next_fast_len(N + maxlag)to avoid circular wrap-around (linear cross-correlation).Compute
PCC2(lag) = IFFT(conj(FFT(φ1)) · FFT(φ2)) / (Nz · N)for every pair in index — O(N log N), same cost as GNCC.
- Parameters:
data (
numpy.ndarray) – 2-D time-domain array(n_stations, N); real-valued. UnlikemyCorr2(), PCC2 requires the time-domain input because the Hilbert transform must be computed before any FFT.maxlag (int or float) – Half-length of output CCF in samples.
energy – Unused (kept for API compatibility with
myCorr2()).index – List of
(ccf_id, sta1_idx, sta2_idx)tuples.normalized –
"MAX"or"ABSMAX"to normalise output; falsy for none. ("POW"is meaningless for PCC2 since amplitudes are discarded; it is silently ignored.)
- Return type:
- Returns:
{ccf_id: ccf_array}of length2*maxlag + 1per pair.
References
- msnoise.core.compute.smooth(x, window='boxcar', half_win=3)
Smooth a 1-D array with a symmetric window.
Pads the signal by reflection at both ends (length
2*half_win + 1) to reduce boundary effects, then convolves with a normalised window.- Parameters:
x (
numpy.ndarray) – 1-D input array.window (str) –
"boxcar"(uniform, default) or"hanning".half_win (int) – Half-width of the window; full width =
2*half_win+1.
- Return type:
- Returns:
Smoothed array, same length as x.
- msnoise.core.compute.whiten(data, Nfft, delta, freqmin, freqmax, plot=False, returntime=False)
Spectral whitening (1-bit / brutal mode) for a single real trace.
Computes the FFT of data, normalises the amplitude to unity in the passband
[freqmin, freqmax], and returns the result. A 100-sample cosine taper transitions smoothly to zero outside the passband, ensuring no sharp spectral edges. This is the “brutal” whitening described by Bensen et al.[10] (their Section 2.2).\[\tilde{X}[k] = \exp\!\left(i\,\arg X[k]\right) \qquad \text{for } \nu_k \in [f_\text{low},\, f_\text{high}]\]Bins outside the passband are zeroed (with a cosine-tapered transition region); Hermitian symmetry is enforced so that the inverse FFT yields a real signal.
- Parameters:
data (
numpy.ndarray) – 1-D real time series.Nfft (int) – FFT length (zero-padding if larger than
len(data)).delta (float) – Sampling interval in seconds (
1 / sampling_rate).freqmin (float) – Lower passband frequency (Hz).
freqmax (float) – Upper passband frequency (Hz).
plot (bool) – Show a diagnostic plot of the whitening stages (default False).
returntime (bool) – If True, return the whitened time-domain signal instead of the frequency-domain array.
- Return type:
- Returns:
Whitened one-sided FFT array (complex) of length
Nfft, or the corresponding real time-domain signal if returntime is True.
- msnoise.core.compute.whiten2(fft, Nfft, low, high, porte1, porte2, psds, whiten_type)
Vectorised in-place spectral whitening for a batch of pre-computed FFTs.
Operates on the one-sided positive-frequency half of each FFT row and enforces Hermitian symmetry afterward. Three modes are available via whiten_type, corresponding to the normalisation strategies described by Bensen et al.[10]:
brutal (default) — one-bit normalisation: amplitude set to unity inside the passband with a cosine taper at both edges:
\[\tilde{X}[k] = \exp\!\left(i\,\arg X[k]\right)\]HANN — one-bit normalisation weighted by a Hann window across the passband, smoothly tapering the spectral amplitude:
\[\tilde{X}[k] = \frac{X[k]}{|X[k]|}\cdot w_\text{Hann}[k]\]PSD — divide by a pre-computed smoothed PSD, then clip outlier bins at the 5th–95th percentile to suppress spectral spikes:
\[\tilde{X}[k] = \operatorname{clip}\!\left( \frac{X[k]}{S[k]},\,-A,\,A \right)\]where \(S[k]\) is the smoothed PSD and \(A\) is the RMS of the non-outlier bins.
- Parameters:
fft (
numpy.ndarray) – 2-D complex array(n_traces, Nfft)of pre-computed FFTs. Modified in-place.Nfft (int) – FFT length (must match
fft.shape[1]).low (int) – Bin index where the left cosine taper begins (below passband).
high (int) – Bin index where the right cosine taper ends (above passband).
porte1 (int) – First bin of the flat passband.
porte2 (int) – Last bin of the flat passband.
psds (
numpy.ndarrayor None) – Smoothed PSD array(n_traces, Nfft//2+1); used only when whiten_type is"PSD".whiten_type (str) – One of
"brutal"(default),"HANN", or"PSD".
- Returns:
None (modifies fft in-place).
- msnoise.core.compute.compute_wct_dtt_batch(freqs, taxis, WXamp, Wcoh, WXdt, dtt_params, dist: float = 0.0)
Compute WCT dt/t and average coherence for one time-step.
Shared implementation used by both the fused path in s08 (where WXamp/Wcoh/WXdt are freshly computed in memory) and the standalone s09 path (where they are loaded from a WCT NetCDF file).
- Parameters:
freqs – 1-D frequency array from the WCT (Hz).
taxis – 1-D lag-time axis array (s).
WXamp – 2-D cross-wavelet amplitude array
(freqs, taxis).Wcoh – 2-D wavelet coherence array
(freqs, taxis).WXdt – 2-D time-delay array
(freqs, taxis).dtt_params – Merged params object containing
wavelet_dtt.*attributes.dist – Interstation distance in km (for dynamic lag). Default 0.
- Returns:
Tuple
(dtt_row, err_row, coh_row, freqs_subset)where dtt_row and err_row are 1-D arrays over the DTT frequency subset and coh_row is the average coherence per frequency bin.
- msnoise.core.compute.resolve_wct_lag_min(dtt_params, dist: float) float
Resolve the coda lag minimum for WCT dt/t computation.
- Parameters:
dtt_params – Params object with
wavelet_dttattributes.dist – Interstation distance in km (used for dynamic lag).
- Returns:
Lag minimum in seconds.
- msnoise.core.compute.build_wct_dtt_dataset(dates_list, dtt_rows, err_rows, coh_rows, freqs_subset)
Build a sorted xarray Dataset of WCT dt/t results.
Shared between the fused save path in s08 and the save path in s09.
- Parameters:
dates_list – List of datetime64 timestamps.
dtt_rows – List of 1-D dt/t arrays (one per date).
err_rows – List of 1-D error arrays.
coh_rows – List of 1-D average-coherence arrays.
freqs_subset – 1-D frequency array for the DTT band.
- Returns:
xarray.Datasetwith variablesDTT,ERR,COHand dims(times, frequency), sorted bytimes.
Preprocessing
S. Ventosa, M. Schimmel, and E. Stutzmann. Towards the processing of large data volumes with phase cross-correlation. Seismological Research Letters, 90(4):1663–1669, 2019. doi:10.1785/0220190022.
S. Ventosa and M. Schimmel. FastPCC: fast phase cross-correlation algorithm for large seismic datasets. IEEE Transactions on Geoscience and Remote Sensing, 61:1–17, 2023. doi:10.1109/TGRS.2023.3294302.
G. D. Bensen, M. H. Ritzwoller, M. P. Barmin, A. L. Levshin, F. Lin, M. P. Moschetti, N. M. Shapiro, and Y. Yang. Processing seismic ambient noise data to obtain reliable broad-band surface wave dispersion measurements. Geophysical Journal International, 169(3):1239–1260, 2007. doi:10.1111/j.1365-246X.2007.03374.x.
Waveform Pre-processing
Pairs are first split and a station list is created. The database is then
queried to get file paths. For each station, all files potentially containing
data for the day are opened. The traces are then merged and split, to obtain
the most continuous chunks possible. The different chunks are then demeaned,
tapered and merged again to a 1-day long trace. If a chunk is not aligned
on the sampling grid (that is, start at a integer times the sample spacing in s)
, the chunk is phase-shifted in the frequency domain. This requires tapering and
fft/ifft. If the gap between two chunks is small, compared to
preprocess_max_gap, the gap is filled with interpolated values.
Larger gaps will not be filled with interpolated values.
Each 1-day long trace is then high-passed (at preprocess_highpass Hz), then
if needed, low-passed (at preprocess_lowpass Hz) and decimated/downsampled.
Decimation/Downsampling are configurable (resampling_method) and users are
advised testing Decimate. One advantage of Downsampling over Decimation is that
it is able to downsample the data by any factor, not only integer factors.
Downsampling is achieved with the ObsPy Lanczos resampler which we tested
against the old scikits.samplerate.
If configured, each 1-day long trace is corrected for its instrument response. Currently, only dataless seed and inventory XML are supported.
As from MSNoise 1.5, the preprocessing routine is separated from the compute_cc and can be used by plugins with their own parameters. The routine returns a Stream object containing all the traces for all the stations/components.
- msnoise.core.preprocessing.apply_preprocessing_to_stream(stream, params, responses=None, logger=None)
Apply MSNoise signal-processing pipeline to a raw ObsPy Stream.
This is the core preprocessing logic extracted from
preprocess()so it can be used for both local-archive and FDSN/EIDA data paths.Operations applied (in order):
Phase-shift alignment
Gap detection and filling (up to
preprocess_max_gapsamples)Sampling-rate check and down-sampling (Resample / Decimate / Lanczos)
Detrend, taper, highpass / lowpass filter
Instrument response removal (if
remove_responseis set)
- Parameters:
stream – Raw
Streamfor one station.params –
MSNoiseParamsfor this lineage.responses – ObsPy
Inventoryfor instrument response removal, orNone.logger – Python logger (defaults to
msnoise.preprocessing).
- Returns:
Processed
Stream, or an empty stream if the data could not be processed.
- msnoise.core.preprocessing.preprocess(stations, comps, goal_day, params, responses=None, loglevel='INFO', logger='msnoise.compute_cc_norot_child')
Fetches data for each
stationsand eachcompsusing the data_availability table in the database.To correct for instrument responses, make sure to set
remove_responseto “Y” in the config and to provide theresponsesDataFrame.- Example:
>>> from msnoise.api import connect, get_params >>> from msnoise.core.preprocessing import preprocess >>> db = connect() >>> params = get_params(db) >>> responses = preload_instrument_responses(db) >>> st = preprocess(db, ["YA.UV06","YA.UV10"], ["Z",], "2010-09-01", params, responses, loglevel="INFO") >>> st 2 Trace(s) in Stream: YA.UV06.00.HHZ | 2010-09-01T00:00:00.000000Z - 2010-09-01T23:59:59.950000Z | 20.0 Hz, 1728000 samples YA.UV10.00.HHZ | 2010-09-01T00:00:00.000000Z - 2010-09-01T23:59:59.950000Z | 20.0 Hz, 1728000 samples
- Parameters:
db (
sqlalchemy.orm.session.Session) – ASessionobject, as obtained bymsnoise.api.connect().stations (list of str) – a list of station names, in the format NET.STA.LOC
comps (list of str) – a list of component names, in Z,N,E,1,2.
goal_day (str) – the day of data to load, ISO 8601 format: e.g. 2016-12-31.
params (class) – an object containing the config parameters, as obtained by
msnoise.api.get_params().responses (
pandas.DataFrame) – a DataFrame containing the instrument responses, as obtained bymsnoise.api.preload_instrument_responses().
- Return type:
- Returns:
A Stream object containing all traces.
Stations & Data Sources
MSNoise station and data-availability management.
- msnoise.core.stations.ARCHIVE_STRUCTURES = {'BUD': 'NET/STA/STA.NET.LOC.CHAN.YEAR.DAY', 'IDDS': 'YEAR/NET/STA/CHAN.TYPE/DAY/NET.STA.LOC.CHAN.TYPE.YEAR.DAY.HOUR', 'PDF': 'YEAR/STA/CHAN.TYPE/NET.STA.LOC.CHAN.TYPE.YEAR.DAY', 'SDS': 'YEAR/NET/STA/CHAN.TYPE/NET.STA.LOC.CHAN.TYPE.YEAR.DAY'}
Archive path templates keyed by structure name. Used by
s01_scan_archive()andpopulate_stations().
- msnoise.core.stations.add_data_source(session, name, uri='', data_structure='SDS', auth_env='MSNOISE')
Add a new
DataSourceto the database.- Parameters:
session – SQLAlchemy session.
name – Human label e.g.
"local","IRIS","EIDA".uri – Data location URI. Schemes: - bare path or
sds:///path→ local SDS archive -fdsn://http://...→ FDSN web service -eida://http://...→ EIDA routing clientdata_structure – SDS sub-path format for local sources (default
"SDS"). Ignored for FDSN/EIDA.auth_env – Environment variable prefix for credentials (default
"MSNOISE"). Worker looks up{auth_env}_FDSN_USER,{auth_env}_FDSN_PASSWORD,{auth_env}_FDSN_TOKEN.
- Returns:
The created
DataSourceobject.
- msnoise.core.stations.check_stations_uniqueness(session, station)
- Parameters:
session
station
- Returns:
- msnoise.core.stations.get_data_availability(session, net=None, sta=None, loc=None, chan=None, starttime=None, endtime=None)
Returns the
DataAvailabilityobjects for specific net, sta, starttime or endtime- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobject, as obtained byconnect()net (str) – Network code
sta (str) – Station code
starttime (datetime.datetime, datetime.date) – Start time of the search
endtime (datetime.datetime, datetime.date) – End time of the search
- Return type:
list
- Returns:
list of
DataAvailability
- msnoise.core.stations.get_data_source(session, id=None, name=None)
Retrieve a
DataSourceby id or name.- Parameters:
session – SQLAlchemy session.
id – Primary key of the DataSource.
name – Human label of the DataSource.
- Returns:
DataSourceorNone.- Raises:
ValueError – If neither id nor name is provided.
- msnoise.core.stations.get_default_data_source(session)
Return the project default
DataSource.The default is the
DataSourcewithref=1(created by the installer).- Parameters:
session – SQLAlchemy session.
- Returns:
- Raises:
RuntimeError – If no DataSource exists (installer not run).
- msnoise.core.stations.distance_from_coords(x1, y1, x2, y2, coordinates='DEG')
Compute interstation distance in km from raw coordinate values.
Pure function — no ORM objects required. Used internally by
get_interstation_distance()and byMSNoiseResult.get_distance()in DB-free archive mode.- Parameters:
x1 – Longitude (DEG) or Easting (UTM) of station 1.
y1 – Latitude (DEG) or Northing (UTM) of station 1.
x2 – Longitude (DEG) or Easting (UTM) of station 2.
y2 – Latitude (DEG) or Northing (UTM) of station 2.
coordinates –
"DEG"for WGS-84 lat/lon,"UTM"for metres.
- Returns:
Distance in kilometres.
- Return type:
- msnoise.core.stations.get_interstation_distance(station1, station2, coordinates='DEG')
Returns the distance in km between station1 and station2.
Warning
Currently the stations coordinates system have to be the same!
- msnoise.core.stations.get_new_files(session)
Returns the files marked “N”ew or “M”odified in the database
- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobject, as obtained byconnect()- Return type:
list
- Returns:
list of
DataAvailability
- msnoise.core.stations.get_station(session, net, sta)
Get one Station from the database.
- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobject, as obtained byconnect()net (str) – the network code
sta (str) – the station code
- Return type:
msnoise.msnoise_table_def.declare_tables.Station- Returns:
a
StationObject
- msnoise.core.stations.get_station_pairs(session, used=None, net=None, include_single_station=False)
Returns an iterator over all possible station pairs. If auto-correlation is configured in the database, returns N*N pairs, otherwise returns N*(N-1)/2 pairs.
- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobject, as obtained byconnect()used (bool, int) – Select only stations marked used if False (default) or all stations present in the database if True
net (str) – Network code to filter for the pairs.
- Return type:
iterable
- Returns:
An iterable of
Stationobject pairs
- msnoise.core.stations.get_stations(session, all=False, net=None, format='raw')
Get Stations from the database.
- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobject, as obtained byconnect()all (bool) – Returns all stations from the database if True, or only stations where used = 1 if False (default)
net (str) – if set, limits the stations returned to this network
- Return type:
list of
msnoise.msnoise_table_def.declare_tables.Station- Returns:
list of
Station
- msnoise.core.stations.get_waveform_path(session, da)
Reconstruct the full absolute path for a DataAvailability record.
Joins
DataSource.uriwithda.pathandda.file. If the DA record has nodata_source_id(legacy or NULL), falls back to treatingda.pathas an absolute path (backward-compatible behaviour).- Parameters:
session – SQLAlchemy session.
da –
DataAvailabilityORM object.
- Returns:
Absolute path string to the waveform file.
- msnoise.core.stations.import_stationxml(session, path_or_url, data_source_id=None, save_to_response_path=True)
Import stations from a StationXML file or URL.
Parses the inventory with ObsPy and creates or updates
Stationrows. Also populatesused_location_codesandused_channel_namesfrom the channel-level inventory (requireslevel=channelin FDSN queries). Empty location codes are stored as"--"(MSNoise convention).When save_to_response_path is
True(default), the parsed inventory is written as a StationXML file into the project’sresponse_pathdirectory (read from global config). This makes the instrument responses immediately available to the preprocessing step without any manual file copying. The file is named<NET>.<STA>.xmlfor single-network inventories, orinventory_<timestamp>.xmlwhen the inventory spans multiple networks. Saving failures are logged as warnings but never abort the import.- Parameters:
session – SQLAlchemy session.
path_or_url – Path to a local StationXML file, or a URL (e.g. an FDSN station web service query URL with
level=channel).data_source_id – DataSource to assign to imported stations.
None→ project default (DataSource.ref=1).save_to_response_path – Write the inventory to
response_pathafter a successful import (defaultTrue).
- Returns:
Tuple
(created, updated, saved_path)where saved_path is the absolute path of the written file, orNoneif not saved.
- msnoise.core.stations.list_data_sources(session)
Return all
DataSourcerows.- Parameters:
session – SQLAlchemy session.
- Returns:
List of
DataSource.
- msnoise.core.stations.mark_data_availability(session, net, sta, flag)
Updates the flag of all
DataAvailabilityobjects matching net.sta in the database- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobject, as obtained byconnect()net (str) – Network code
sta (str) – Station code
flag (str) – Status of the DataAvailability object: New, Modified or Archive. Values accepted are {‘N’, ‘M’, ‘A’}
- msnoise.core.stations.read_waveforms_from_availability(session, da_records, t_start, t_end, logger=None)
Read waveforms from a list of DataAvailability records into a Stream.
Builds the full path for each record via
get_waveform_path()(honouring the DataSource root), deduplicates paths, and reads each file sliced to[t_start, t_end]using ObsPy. Files that cannot be read are silently skipped with a debug log.This is the canonical “DA records → waveform Stream” helper that any worker (PSD, future steps) should use instead of hand-rolling
os.path.join(f.path, f.file).- Parameters:
session – SQLAlchemy session.
da_records – Iterable of DataAvailability ORM objects.
t_start –
UTCDateTimestart.t_end –
UTCDateTimeend.logger – Optional
logging.Logger; uses module logger if None.
- Returns:
Stream(may be empty).
- msnoise.core.stations.resolve_data_source(session, station)
Return the effective
DataSourcefor a station.If the station has
data_source_idset, that DataSource is returned. Otherwise the project default (ref=1) is used.- Parameters:
session – SQLAlchemy session.
station –
StationORM object.
- Returns:
- msnoise.core.stations.set_all_stations_source(session, data_source_id)
Assign a DataSource to every station in the database.
- Parameters:
session – SQLAlchemy session.
data_source_id – Primary key of the DataSource.
Nonereverts all to default.
- msnoise.core.stations.set_network_source(session, net, data_source_id)
Assign a DataSource to all stations of a network.
- Parameters:
session – SQLAlchemy session.
net – Network code.
data_source_id – Primary key of the DataSource.
Nonereverts to default.
- msnoise.core.stations.set_station_source(session, net, sta, data_source_id)
Assign a specific
DataSourceto a station.- Parameters:
session – SQLAlchemy session.
net – Network code.
sta – Station code.
data_source_id – Primary key of the DataSource to assign. Pass
Noneto revert to the project default.
- Raises:
ValueError – If the station or DataSource does not exist.
- msnoise.core.stations.to_sds(stats, year, jday)
Build an SDS-format relative file path from ObsPy trace stats.
Returns a path string of the form:
YYYY/NET/STA/CHAN.D/NET.STA.LOC.CHAN.D.YYYY.DDD
- Parameters:
stats –
obspy.core.trace.Statsobject.year – 4-digit year integer.
jday – Julian day-of-year integer (1-366).
- Returns:
Relative SDS path string.
- msnoise.core.stations.update_data_availability(session, net, sta, loc, chan, path, file, starttime, endtime, data_duration, gaps_duration, samplerate, data_source_id=None)
Updates a DataAvailability object in the database
- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobject, as obtained byconnect()net (str) – The network code of the Station
sta (str) – The station code
chan (str) – The component (channel)
path (str) – Path to the folder containing the file, relative to
DataSource.uri.file (str) – The name of the file
starttime (datetime.datetime) – Start time of the file
endtime (datetime.datetime) – End time of the file
data_duration (float) – Cumulative duration of available data in the file
gaps_duration (float) – Cumulative duration of gaps in the file
samplerate (float) – Sample rate of the data in the file (in Hz)
data_source_id (int or None) – FK to DataSource.
None→ project default.
- msnoise.core.stations.update_data_source(session, id, **kwargs)
Update fields on an existing
DataSource.- Parameters:
session – SQLAlchemy session.
id – Primary key of the DataSource to update.
kwargs – Fields to update:
name,uri,data_structure,auth_env.
- Returns:
Updated
DataSource.- Raises:
ValueError – If the DataSource does not exist.
- msnoise.core.stations.update_station(session, net, sta, X, Y, altitude, coordinates='UTM', instrument='N/A', used=1)
Updates or Insert a new Station in the database.
See also
msnoise.msnoise_table_def.declare_tables.Station- Parameters:
session (
sqlalchemy.orm.session.Session) – ASessionobject, as obtained byconnect()net (str) – The network code of the Station
sta (str) – The station code
X (float) – The X coordinate of the station (Easting or Longitude)
Y (float) – The Y coordinate of the station (Northing or Latitude)
altitude (float) – The altitude of the station
coordinates (str) – The coordinates system. “DEG” is WGS84 latitude/ longitude in degrees. “UTM” is expressed in meters.
instrument (str) – The instrument code, useful with PAZ correction
used (bool) – Whether this station must be used in the computations.
- msnoise.core.stations.populate_stations(db, loglevel='INFO')
Scan the default DataSource archive and populate the Station table.
Walks the archive directory according to the configured
data_structure(SDS, BUD, IDDS, PDF, or a custom format). Station coordinates are initialised to zero — useimport_stationxml()afterwards to set real coordinates.For custom data structures, a
custom.pyfile in the current working directory must export apopulate(data_folder)function that returns a{NET_STA: [net, sta, lon, lat, alt, coordinates]}dictionary.- Parameters:
db – SQLAlchemy session.
loglevel – Logging verbosity (default
"INFO").
- Returns:
Trueon success.
FDSN / EIDA Data Access
FDSN/EIDA waveform fetching and bulk SDS download for MSNoise.
This module is the only place in MSNoise that makes network calls to external data services. It provides two distinct modes of data access:
- On-the-fly fetch (used by the preprocess step)
fetch_and_preprocess()/fetch_raw_waveforms()— called per station per day during the preprocess worker loop. Streams waveforms directly from FDSN into the processing pipeline without writing a local archive.- Bulk download (
mass_download()) Wraps ObsPy’s
MassDownloaderto fetch all waveforms for the full project date range in one call, routing them into a day-aligned local SDS archive. Station gating is done server-side via an ObsPyInventorybuilt from the MSNoise station table — avoiding any NSLC cartesian blowup. After bulk download, runmsnoise scan_archiveto populate data availability before starting the pipeline.
Typical bulk-download sequence:
msnoise db init --from-yaml project.yaml
msnoise utils import-stationxml https://... # populates locs/chans
msnoise utils download # calls mass_download()
msnoise scan_archive --init
msnoise utils run_workflow
SDS root resolution (in order):
Explicit
--sds-pathCLI option.The single unambiguous local SDS DataSource URI in the database.
./SDS(with a warning).
StationXML files are written to <sds_root>/../stationxml/ and never
- msnoise.core.fdsn.build_client(ds)
Build an ObsPy client for the given DataSource.
- Parameters:
ds –
DataSourceORM object.- Returns:
An ObsPy client with a
get_waveforms_bulkmethod.
- msnoise.core.fdsn.fetch_and_preprocess(db, jobs, goal_day, params, responses=None, loglevel='INFO', client=None)
Fetch waveforms from FDSN/EIDA for a batch of stations on one day.
Groups jobs by DataSource (all jobs in a batch share the same
data_source_idwhengroup_by="day_lineage_datasource"is used), issues oneget_waveforms_bulkcall, splits the result by station, and optionally writes raw files.- Parameters:
db – SQLAlchemy session.
jobs – List of Job ORM objects (all same day, same DataSource).
goal_day – Date string
YYYY-MM-DD.params –
MSNoiseParamsfor this lineage.responses – ObsPy Inventory for instrument response removal, or None.
loglevel – Logging level string.
- Returns:
Tuple
(stream, done_jobs, failed_jobs)where stream contains preprocessed traces for all successfully fetched stations.
- msnoise.core.fdsn.fetch_raw_waveforms(db, jobs, goal_day, params, t_start=None, t_end=None, client=None)
Fetch raw (unprocessed) waveforms from FDSN/EIDA for a batch of stations.
Issues one
get_waveforms_bulkcall covering all stations in jobs, optionally writes a raw file cache (fdsn_keep_raw=Y), and returns the combinedStream. No preprocessing is applied — the caller receives exactly what the FDSN server returns.Use this when downstream processing needs raw data (e.g. PSD computation via ObsPy PPSD, which handles its own response correction internally). For pre-processing + response removal use
fetch_and_preprocess().- Parameters:
db – SQLAlchemy session.
jobs – List of Job ORM objects (all same DataSource).
goal_day – Date string
YYYY-MM-DD.params –
MSNoiseParamsfor this lineage.t_start – Optional
UTCDateTimeoverride for the fetch window start (default: midnight of goal_day).t_end – Optional
UTCDateTimeoverride for the fetch window end (default: midnight + 86400 s).
- Returns:
Stream(may be empty on failure).
- msnoise.core.fdsn.fetch_waveforms_bulk(client, bulk_request, retries=3)
Issue a bulk waveform request with retry logic.
- Parameters:
client – ObsPy FDSN/EIDA client.
bulk_request – List of
(net, sta, loc, chan, t1, t2)tuples.retries – Number of retry attempts for transient errors.
- Returns:
Stream(may be empty on failure).- Raises:
Re-raises auth errors and no-data exceptions immediately.
- msnoise.core.fdsn.is_remote_source(uri: str) bool
Return True if the DataSource URI points to a remote service.
- msnoise.core.fdsn.parse_datasource_scheme(uri: str) str
Return the scheme of a DataSource URI.
- Parameters:
uri – DataSource.uri string.
- Returns:
One of
"local","sds","fdsn","eida".
- msnoise.core.fdsn.get_auth(auth_env: str) dict
Read credentials from environment variables for the given prefix.
Looks up: -
{auth_env}_FDSN_USER-{auth_env}_FDSN_PASSWORD-{auth_env}_FDSN_TOKEN(path to EIDA token file, or token string)- Parameters:
auth_env – Env var prefix (e.g.
"MSNOISE","IRIS").- Returns:
Dict with keys
user,password,token(any may be None).
- msnoise.core.fdsn.mass_download(session, schema, sds_root=None, startdate_override=None, enddate_override=None)
Download waveforms for all remote DataSources into an SDS archive.
Builds an ObsPy
Inventoryfrom the station table and passes it aslimit_stations_to_inventory— the MassDownloader gates at station level server-side, avoiding any NSLC cartesian blowup. Channel codes are the union of allStation.chans().chunklength_in_sec=86400produces day-aligned SDS files. StationXML is stored under<sds_root>/../stationxml/and never overwritten.sanitize=Falseprevents discarding traces for missing response.- Parameters:
session – SQLAlchemy session.
schema – Declared schema object (from
declare_tables()).sds_root – SDS write root as
Path, orNoneto auto-resolve (see_resolve_sds_root()).startdate_override – Override project startdate (
YYYY-MM-DD).enddate_override – Override project enddate (
YYYY-MM-DD).
- Raises:
ValueError – If no remote DataSource or no valid stations found.
- exception msnoise.core.fdsn.FDSNConnectionError
Raised when an FDSN request fails due to a connection-level error.
Distinct from
FDSNNoDataException(no matching data) and auth errors (wrong credentials). Callers that cache client objects should catch this, invalidate the cache, rebuild the client, and retry.
Project Archive I/O
Low-level I/O helpers for MSNoise project archives.
A project archive is a .tar.zst file containing a full MSNoise project
at a given pipeline level (preprocess / cc / stack / dvv …). It is distinct
from an MSNoiseResult result bundle (single
lineage, params.yaml + _output/).
This module handles filesystem operations (extract, export, checksum, job
reconstruction). Higher-level logic lives in msnoise.project.
Archive format
Every .tar.zst archive has this internal layout:
meta.yaml ← {entry_level, msnoise_version, created_at, project_name}
MANIFEST.json ← {relative_path: {sha256, size_bytes}} for every file
project.yaml ← full MSNoise config (importable)
<lineage>/<step>/_output/... ← pipeline outputs
<lineage>/<step>/params.yaml ← per-lineage params (enables DB-free access)
Paths inside the archive are relative to the project root, so extraction into any directory produces a valid project tree.
bundle_pointer.yaml format
msnoise_version_min: "2.0.1"
created_at: "2026-04-01"
levels:
stack:
description: "Stacked CCFs + refstacks for all filter sets"
url: "https://ftp.seismology.be/msnoise/study/level_stack.tar.zst"
sha256: "a1b2c3d4…"
size_gb: 6.8
dvv:
description: "Final DVV aggregates and per-pair series"
url: "https://zenodo.org/record/XXXXXXX/files/level_dvv.tar.zst"
sha256: "d4e5f6…"
size_gb: 0.3
url must be a plain HTTPS URL; sha256 is the hash of the
.tar.zst file itself (printed by export_project() on completion).
Typical workflow
Export:
from msnoise.core.project_io import export_project
sha = export_project("/path/to/project", "stack", "/data/level_stack.tar.zst")
# prints sha256 — paste into bundle_pointer.yaml
Import (Python):
from msnoise.core.project_io import import_project_archive
root = import_project_archive("bundle_pointer.yaml", "stack", "./my_project")
Import (CLI):
msnoise project import --from bundle_pointer.yaml --level stack \
--project-dir ./my_project --with-jobs
- msnoise.core.project_io.extract_archive(src: str | Path, dest: str | Path) Path
Extract a
.tar.zstproject archive into dest.The archive may contain files at the root or inside a single top-level directory — both layouts are handled transparently. The extracted tree is always rooted at dest (no extra nesting introduced).
- Parameters:
src – Path to the
.tar.zstfile.dest – Destination directory (created if absent).
- Returns:
Absolute
Pathto the extracted project root (i.e. the directory that containsproject.yaml).- Raises:
FileNotFoundError – if src does not exist.
ValueError – if src is not a recognised archive format.
- msnoise.core.project_io.file_sha256(path: str | Path) str
Return the hex SHA-256 digest of a file (streaming, memory-efficient).
- msnoise.core.project_io.scan_lineages(project_dir: str | Path, category: str) list[tuple[list[str], Path]]
Locate all computed lineages for category under project_dir.
Scans the directory tree for folders named
<category>_Nthat contain an_outputsubdirectory. The lineage names are reconstructed from the relative path segments between project_dir and the matched folder.- Parameters:
project_dir – MSNoise project root (contains
project.yaml).category – Category name without set number, e.g.
"stack".
- Returns:
List of
(lineage_names, step_dir)tuples, one per match. lineage_names is ordered from root to leaf (e.g.["global_1", "preprocess_1", "cc_1", "filter_1", "stack_1"]). step_dir is the absolutePathto the matched<category>_Nfolder.
- msnoise.core.project_io.build_params_from_project_yaml(project_yaml_path: str | Path, lineage_names: list[str])
Build an
MSNoiseParamsfrom aproject.yaml.Reads project_yaml_path and extracts the config layers for each step in lineage_names. The returned object has the same structure as one produced by
from_yaml().- Parameters:
project_yaml_path – Path to
project.yaml.lineage_names – Ordered list of step names, e.g.
["global_1", "preprocess_1", "stack_1"].
- Returns:
MSNoiseParamswith one layer per category in lineage_names.
- msnoise.core.project_io.LEVEL_GLOBS: dict[str, list[str]] = {'cc': ['preprocess_*/cc_*/_output/**'], 'dvv': ['**/mwcs_dtt_dvv_[0-9]*/_output/**', '**/stretching_dvv_[0-9]*/_output/**', '**/wavelet_dtt_dvv_[0-9]*/_output/**'], 'mwcs': ['**/mwcs_[0-9]*/_output/**', '**/mwcs_dtt_[0-9]*/_output/**'], 'preprocess': ['preprocess_*/_output/**'], 'stack': ['**/filter_*/stack_*/_output/**', '**/filter_*/refstack_*/_output/**'], 'stretching': ['**/stretching_[0-9]*/_output/**'], 'wavelet': ['**/wavelet_[0-9]*/_output/**', '**/wavelet_dtt_[0-9]*/_output/**']}
Glob patterns (relative to project root) for each entry level. Each pattern addresses the
_output/directory of the relevant steps.
- msnoise.core.project_io.LEVEL_CATEGORIES: dict[str, list[str]] = {'cc': ['cc'], 'dvv': ['mwcs_dtt_dvv', 'stretching_dvv', 'wavelet_dtt_dvv'], 'mwcs': ['mwcs', 'mwcs_dtt'], 'preprocess': ['preprocess'], 'stack': ['stack', 'refstack'], 'stretching': ['stretching'], 'wavelet': ['wavelet', 'wavelet_dtt']}
Categories whose outputs are present in each entry level. Used by
reconstruct_jobs_from_filesystem()to know which steps to scan when insertingflag=Djobs.
- msnoise.core.project_io.export_project(project_dir: str | Path, level: str, output_path: str | Path) str
Export a project archive (
.tar.zst) for the given entry level.Collects all
_output/trees matching level (seeLEVEL_GLOBS), generates aparams.yamlalongside each matched step directory, writesmeta.yamlandMANIFEST.json, and streams everything into a.tar.zstfile.- Parameters:
project_dir – MSNoise project root (contains
project.yaml).level – Entry level — one of the keys in
LEVEL_GLOBS.output_path – Destination
.tar.zstfile path (created/overwritten).
- Returns:
Hex SHA-256 of the written archive (paste into
bundle_pointer.yaml).- Raises:
ValueError – if level is not a recognised entry level.
FileNotFoundError – if
project.yamlis absent from project_dir.
- msnoise.core.project_io.reconstruct_jobs_from_filesystem(session, schema, level: str, root: str | Path) int
Insert
flag=Djobs by scanning the extracted_output/tree.After
msnoise db init --from-yamlthe jobs table is empty. This function synthetically populates it so that normalnew_jobspropagation generates the correct downstreamflag=Tjobs.- Parameters:
session – SQLAlchemy session (DB must already be initialised).
schema – Return value of
declare_tables().level – Entry level string (key of
LEVEL_CATEGORIES).root – Project root directory.
- Returns:
Total number of
flag=Djobs inserted.- Raises:
ValueError – if level is unknown.
- msnoise.core.project_io.export_project_levels(project_dir: str | Path, levels: str | list[str], output_dir: str | Path, url_base: str = '') dict
Export one
.tar.zstarchive per entry level and writebundle_pointer.yaml.Levels with no matching
_output/content are skipped silently.- Parameters:
project_dir – MSNoise project root.
levels – Level(s) to export: a single level name, a list, or
"all"to attempt every level inLEVEL_GLOBS.output_dir – Directory where archives and
bundle_pointer.yamlare written (created if absent).url_base – Optional URL prefix for the
urlfields inbundle_pointer.yaml. If supplied, URLs are formed as<url_base>/<filename>. Placeholders are written when omitted.
- Returns:
Dict mapping level name →
{"path": Path, "sha256": str}for every exported archive.- Raises:
ValueError – if levels contains an unrecognised level name.
- msnoise.core.project_io.import_project_archive(pointer_path: str | Path, level: str | list[str], project_dir: str | Path) Path
Download and extract one or more project archives from a
bundle_pointer.yaml.- Parameters:
pointer_path – Path to
bundle_pointer.yaml.level – Entry level(s) to import. Pass a single string (e.g.
"stack"), a list (["stack", "dvv"]), or"all"to download every level listed in the pointer.project_dir – Destination directory (created if absent). All archives are extracted into the same directory — their
_output/trees never overlap.
- Returns:
Absolute path to the extracted project root.
- Raises:
KeyError – if a requested level is absent from
bundle_pointer.yaml.ValueError – if a downloaded archive fails the SHA-256 check.