#!/usr/bin/env python3
#
# Copyright (C) 2022 Madeline Wade, Aaron Viets
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

"""
This pipeline takes in a calibrated strain channel produced by another process (likely the CALCS system in the front-end) and compensates the strain for known issues, such as correcting time stamps and CALCS filter inaccuracies.  Additionally, this pipeline completes the calculation of a state vector indicating the fidelity of the strain channel over time. 
"""

import sys
import h5py
import resource
import time

from optparse import OptionParser, Option
import configparser

import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
GObject.threads_init()
Gst.init(None)

import lal

from gstlal import pipeparts
from gstlalcalibration import calibration_parts
from gstlal import datasource
from gstlalcalibration import calibhandler
from gstlal import simplehandler

#
# Function definition for writing pipeline graph
#

def write_graph(demux):
	pipeparts.write_dump_dot(pipeline, "%s.%s" % (DebuggingConfigs["pipelinegraphfilename"], "PLAYING"), verbose=True)

#
# Make sure we have sufficient resources
# We allocate far more memory than we need, so this is okay
#

def setrlimit(res, lim):
	hard_lim = resource.getrlimit(res)[1]
	resource.setrlimit(res, (lim if lim is not None else hard_lim, hard_lim))

if sys.platform == "linux":
	# set the number of processes and total set size up to hard limit and
	# shrink the per-thread stack size (default is 10 MiB)
	setrlimit(resource.RLIMIT_NPROC, None)
	setrlimit(resource.RLIMIT_AS, None)
	setrlimit(resource.RLIMIT_RSS, None)
	setrlimit(resource.RLIMIT_STACK, 1024*1024)

#
# Function definition to obtain the current GPS time
#

def now():
	return lal.LIGOTimeGPS(lal.UTCToGPS(time.gmtime()), 0)

#############################################################################
##################### Program Command Line Options ##########################
#############################################################################

parser = OptionParser(description = __doc__)

# Append program specific options

parser.add_option("--gps-start-time", metavar = "seconds", help = "Set the start time of the segment to analyze in GPS seconds. This is required iff DataSource is frames")
parser.add_option("--gps-end-time", metavar = "seconds", help = "Set the end time of the segment to analyze in GPS seconds. This is required iff DataSource is =frames")
parser.add_option("--frame-cache", metavar = "filename", help = "Set the name of the LAL cache listing the LIGO .gwf frame files (optional).  This is required iff DataSource is frames")
parser.add_option("--output-path", metavar = "name", default = ".", help = "Set the output path for writing frame files. (Default=Current)")
parser.add_option("--wings", metavar = "seconds", default = 0, type = "int", help = "Number of seconds to trim off of the beginning and end of the output. Should only be used if DataSource is frames.")
parser.add_option("--frame-duration", metavar = "seconds", type = "int", default = 4, help = "Set the number of seconds for each frame. (Default = 4)")
parser.add_option("--frames-per-file", metavar = "count", type = "int", default = 1, help = "Set the number of frames per frame file. (Default = 1)")
parser.add_option("--config-file", metavar = "name", help = "Full path to configuration file for running.")

# Parse options
options, filenames = parser.parse_args()

#############################################################################
######################### Config File Options  ##############################
#############################################################################

def ConfigSectionMap(section):
	dict1 = {}
	options = Config.options(section)
	for option in options:
		try:
			dict1[option] = Config.get(section, option)
			if dict1[option] == -1:
				DebugPrint("skip: %s" % option)
		except:
			print("exception on %s!" % option)
			dict[option] = None
	return dict1

Config = configparser.ConfigParser()
Config.read(options.config_file)

InputConfigs = ConfigSectionMap("InputConfigurations")
OutputConfigs = ConfigSectionMap("OutputConfigurations")
CalibrationConfigs = ConfigSectionMap("CalibrationConfigurations")
DebuggingConfigs = ConfigSectionMap("DebuggingConfigurations")
ChannelNames = ConfigSectionMap("ChannelNames")
SampleRates = ConfigSectionMap("SampleRates")
Bitmasks = ConfigSectionMap("Bitmasks")
PipelineConfigs = ConfigSectionMap("PipelineConfigurations")
MonitorConfigs = ConfigSectionMap("MonitoringConfigurations")
FilterConfigs = ConfigSectionMap("FilterConfigurations")

# Sanity checks for command line options
data_sources = set(("frames", "lvshm"))

if InputConfigs["datasource"] not in data_sources:
	raise ValueError("DataSource must be one of %s" % ",".join(data_sources))

if InputConfigs["datasource"] == "frames" and options.frame_cache is None:
	raise ValueError("FrameCache must be specified when using DataSource: frames")

if int(options.wings != 0) and InputConfigs["datasource"] != "frames":
	raise ValueError("Wings can only be set when DataSource: frames")

if CalibrationConfigs["ifo"] is None:
	raise ValueError("must specify IFO")

if InputConfigs["datasource"] == "frames" and (options.gps_start_time is None or options.gps_end_time is None):
	raise ValueError("must specify --gps-start-time and --gps-end-time when DataSource: frames")

if options.gps_start_time is not None:
	if options.gps_end_time is None:
		raise ValueError("must provide both --gps-start-time and --gps-end-time")
	if InputConfigs["datasource"] == "lvshm":
		raise ValueError("cannot set --gps-start-time or --gps-end-time with DataSource: lvshm")	
	if int(options.gps_start_time) >= int(options.gps_end_time):
		raise ValueError("--gps-start-time must be < --gps-end-time: %s < %s" % (options.gps_start_time, options.gps_end_time))
elif options.gps_end_time is not None:
	raise ValueError("must provide both --gps-start-time and --gps-end-time")

###################################################################################################
######################################## Setup ####################################################
###################################################################################################

# Set up instrument and channel name info from command line options
instrument = CalibrationConfigs["ifo"]

if options.gps_start_time is not None and options.gps_end_time is not None:
	gps_start_time = int(options.gps_start_time)
	gps_end_time = int(options.gps_end_time)

if InputConfigs["datasource"] == "frames":
	start = gps_start_time
elif InputConfigs["datasource"] == "lvshm":
	tm = time.gmtime()
	start = int(lal.UTCToGPS(tm))

# Set up short-cut names for each of the sample rates used throughout the pipeline and establish caps string shortcuts
hoft_sr = int(SampleRates["hoftsr"]) # Sample rate for h(t)
calibstate_sr = int(SampleRates["calibstatesr"]) # Sample rate for the CALIB_STATE_VECTOR
obsintent_sr = int(SampleRates["obsintentsr"]) if "obsintentsr" in SampleRates else int(SampleRates["odcsr"]) # Sample rate of the obs intent bit channel that is read in
lownoise_sr = int(SampleRates["lownoisesr"]) if "lownoisesr" in SampleRates else int(SampleRates["odcsr"])
filterclock_sr_list = SampleRates["filterclocksrlist"].split(';') if "filterclocksrlist" in SampleRates else [lownoise_sr]
hwinj_sr = int(SampleRates["hwinjsr"]) if "hwinjsr" in SampleRates else int(SampleRates["odcsr"])
metrics_sr = int(SampleRates["metricssr"])  if "metricssr" in SampleRates else 1 # Sample rate for recording metrics from pipeline
calcssv_sr = int(SampleRates["calcssvsr"]) if "calcssvsr" in SampleRates else 1 # Sample rate for the CALCS state vector channel

hoft_caps = "audio/x-raw, format=F64LE, rate=%d, channel-mask=(bitmask)0x0" % hoft_sr
latency_metrics_caps = "audio/x-raw, format=F64LE, rate=%d, channel-mask=(bitmask)0x0" % metrics_sr
calibstate_caps = "audio/x-raw, format=U32LE, rate=%d, channel-mask=(bitmask)0x0" % calibstate_sr
calibstate_metrics_caps = "audio/x-raw, format=U32LE, rate=%d, channel-mask=(bitmask)0x0" % metrics_sr
obsintent_caps = "audio/x-raw, format=U32LE, rate=%d, channel-mask=(bitmask)0x0" % obsintent_sr
lownoise_caps = "audio/x-raw, format=U32LE, rate=%d, channel-mask=(bitmask)0x0" % lownoise_sr
filterclock_caps = []
for i in range(len(filterclock_sr_list)):
	filterclock_caps.append("audio/x-raw, format=U32LE, rate=%d, channel-mask=(bitmask)0x0" % int(filterclock_sr_list[i]))
hwinj_caps = "audio/x-raw, format=U32LE, rate=%d, channel-mask=(bitmask)0x0" % hwinj_sr
calcssv_caps = "audio/x-raw, format=U32LE, rate=%d, channel-mask=(bitmask)0x0" % calcssv_sr

# Short cut names for a few other items that appear numerous times
input_frame_duration = int(InputConfigs["inputframeduration"])
wait_time = float(InputConfigs["waittime"]) if ("waittime" in InputConfigs and InputConfigs["datasource"] == "lvshm") else 0.0
kafka_server = MonitorConfigs["kafkaserver"] if "kafkaserver" in MonitorConfigs else None
latency_suffix = MonitorConfigs["latencysuffix"] if "latencysuffix" in MonitorConfigs else None

# Set up string for the channels suffix and prefix as provided by the user
if OutputConfigs["chansuffix"] != "None":
	chan_suffix = OutputConfigs["chansuffix"]
else:
	chan_suffix = ""
chan_prefix = OutputConfigs["chanprefix"]

# Set up shortcut variables for boolean options
compute_calib_statevector = Config.getboolean("CalibrationConfigurations", "computecalibstatevector")
td = not Config.getboolean("PipelineConfigurations", "frequencydomainfiltering")
verbose = Config.getboolean("DebuggingConfigurations", "verbose")
file_check_sum = Config.getboolean("InputConfigurations", "filechecksum")
skip_bad_files = Config.getboolean("InputConfigurations", "skipbadfiles")
monitor_latency = Config.getboolean("DebuggingConfigurations", "testlatency") if "testlatency" in DebuggingConfigs else False
test_filters = Config.getboolean("DebuggingConfigurations", "testfilters") if "testfilters" in DebuggingConfigs else False

# Silence latency metric output from stdout if not running in "monitor_latency" mode
if monitor_latency:
	silent = False
else:
	silent = True

#
# Load in the filters file that contains filter coefficients, etc.
#

filtersfile = InputConfigs["filtersfilename"]

filters = h5py.File(filtersfile, "r")

highpass_filt = filters[FilterConfigs["highpassname"]][:] if "highpassname" in FilterConfigs else None
highpass_delay = int(filters[FilterConfigs["highpassdelay"]][()]) if "highpassdelay" in FilterConfigs else None
highpass_sr = int(filters[FilterConfigs["highpasssr"]][()]) if "highpasssr" in FilterConfigs else None
corr_filt = filters[FilterConfigs["correctionname"]][:] if "correctionname" in FilterConfigs else None
corr_delay = int(filters[FilterConfigs["correctiondelay"]][()]) if "correctiondelay" in FilterConfigs else None
corr_sr = int(filters[FilterConfigs["correctionsr"]][()]) if "correctionsr" in FilterConfigs else None

filters.close()

#
# Set up the appropriate channel list. In this section, we also fill a list called headkeys
# that will be the keys for the dictionary holding each pipeline branch name, and we set up
# a dictionary that will be populated with pipeline branch names based on the channel list.
#

head_dict = {}
channel_list = []
headkeys = []

# If we are computing the CALIB_STATE_VECTOR, we need input DQ channel(s)
if compute_calib_statevector:
	calcs_sv_channel_name = ChannelNames["calcssvchannel"] if "calcssvchannel" in ChannelNames else None
	obsintent_channel_name = ChannelNames["obsintentchannel"]
	obsintent_bitmask = int(Bitmasks["obsintentbitmask"])
	lownoise_channel_name = ChannelNames["lownoisestatechannel"]
	lownoise_bitmask = int(Bitmasks["lownoisebitmask"]) if "lownoisebitmask" in Bitmasks else int(Bitmasks["obsreadybitmask"])
	filterclock_channel_list = ChannelNames["filterclockchannellist"].split(';') if "filterclockchannellist" in ChannelNames else [lownoise_channel_name]
	filterclock_bitmask_list = Bitmasks["filterclockbitmasklist"].split(';') if "filterclockbitmasklist" in Bitmasks else [lownoise_bitmask]
	hwinj_channel_name = ChannelNames["hwinjchannel"]
	cbchwinj_bitmask = int(Bitmasks["cbchwinjbitmask"]) if "cbchwinjbitmask" in Bitmasks else None
	cbchwinj_offbitmask = int(Bitmasks["cbchwinjoffbitmask"]) if "cbchwinjoffbitmask" in Bitmasks else None
	bursthwinj_bitmask = int(Bitmasks["bursthwinjbitmask"]) if "bursthwinjbitmask" in Bitmasks else None
	bursthwinj_offbitmask = int(Bitmasks["bursthwinjoffbitmask"]) if "bursthwinjoffbitmask" in Bitmasks else None
	detcharhwinj_bitmask = int(Bitmasks["detcharhwinjbitmask"]) if "detcharhwinjbitmask" in Bitmasks else None
	detcharhwinj_offbitmask = int(Bitmasks["detcharhwinjoffbitmask"]) if "detcharhwinjoffbitmask" in Bitmasks else None
	stochhwinj_bitmask = int(Bitmasks["stochhwinjbitmask"]) if "stochhwinjbitmask" in Bitmasks else None
	stochhwinj_offbitmask = int(Bitmasks["stochhwinjoffbitmask"]) if "stochhwinjoffbitmask" in Bitmasks else None

	channel_list.append((instrument, obsintent_channel_name))
	headkeys.append("obsintent")
	if lownoise_channel_name != obsintent_channel_name:
		channel_list.append((instrument, lownoise_channel_name))
		headkeys.append("lownoise")
	if hwinj_channel_name != obsintent_channel_name and hwinj_channel_name != lownoise_channel_name:
		channel_list.append((instrument, hwinj_channel_name))
		headkeys.append("hwinj")
	if calcs_sv_channel_name != obsintent_channel_name and calcs_sv_channel_name != lownoise_channel_name and calcs_sv_channel_name != hwinj_channel_name and calcs_sv_channel_name is not None:
		channel_list.append((intrument, calcs_sv_channel_name))
		headkeys.append("calcs_sv")
	if len(filterclock_bitmask_list) != len(filterclock_channel_list) or len(filterclock_bitmask_list) != len(filterclock_sr_list):
		raise ValueError("FilterClockChannelList, FilterClockBitmaskList, and FilterClockSRList must all be the same length.")
	for i in range(len(filterclock_channel_list)):
		filterclock_channel_list[i] = filterclock_channel_list[i].split(',')
		for j in range(len(filterclock_channel_list[i])):
			if filterclock_channel_list[i][j] != obsintent_channel_name and filterclock_channel_list[i][j] != lownoise_channel_name and filterclock_channel_list[i][j] != calcs_sv_channel_name:
				channel_list.append((instrument, filterclock_channel_list[i][j]))
				headkeys.append(filterclock_channel_list[i][j])
		filterclock_sr_list[i] = int(filterclock_sr_list[i])
		filterclock_bitmask_list[i] = int(filterclock_bitmask_list[i])

# Also need to add front-end strain channel to channel list
channel_list.append((instrument, ChannelNames["calcsstrainchannel"]))
headkeys.append("calcsstrain")

####################################################################################################
####################################### Main Pipeline ##############################################
####################################################################################################

pipeline = Gst.Pipeline(name="gstlal_correct_strain")
mainloop = GObject.MainLoop()
if kafka_server is not None:
	handler = calibhandler.Handler(mainloop, pipeline, kafka_server = kafka_server, verbose = verbose, latency_suffix = latency_suffix)
else:
	handler = simplehandler.Handler(mainloop, pipeline)

# 
# Turn off debugging tools or verboseness
#

pipeparts.mkchecktimestamps = lambda pipeline, src, *args: src # comment this line out to turn on the checktimestamps debugging
if not verbose:
	pipeparts.mkprogressreport = lambda pipeline, src, *args: src

#
# Read in data from frames or shared memory
#

if InputConfigs["datasource"] == "lvshm": # Data is to be read from shared memory; "low-latency" mode
	src = pipeparts.mklvshmsrc(pipeline, shm_name = InputConfigs["shmpartition"], assumed_duration = input_frame_duration)
elif InputConfigs["datasource"] == "frames": # Data is to be read from frame files; "offline" mode
	src = pipeparts.mklalcachesrc(pipeline, location = options.frame_cache, cache_dsc_regex = instrument, use_mmap = True)

if monitor_latency:
	src = pipeparts.mktee(pipeline, src)
	src_latency = pipeparts.mklatency(pipeline, src, name = "%s_src" % OutputConfigs["frametype"], silent = silent)
	if kafka_server is not None:
		src_latency.connect("notify::current-latency", handler.latency_new_buffer)
	pipeparts.mkfakesink(pipeline, src_latency)

#
# Hook up the relevant channels to the demuxer
#

demux = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum = file_check_sum, skip_bad_files = skip_bad_files, channel_list = list(map("%s:%s".__mod__, channel_list)))

# Write the pipeline graph after pads have been hooked up to the demuxer
if DebuggingConfigs["pipelinegraphfilename"] != "None":
	demux.connect("no-more-pads", write_graph)

# Get everything hooked up and fill in discontinuities
for key, chan in zip(headkeys, channel_list):
	head_dict[key] = calibration_parts.hook_up(pipeline, demux, chan[1], instrument, float(PipelineConfigs["bufferlength"]), wait_time = wait_time)

# Set up calcs strain channel
strain = calibration_parts.caps_and_progress(pipeline, head_dict["calcsstrain"], hoft_caps, "calcs_strain")
if compute_calib_statevector or test_filters:
	calcsstraintee = pipeparts.mktee(pipeline, strain)
	strain = calcsstraintee

# Zero out filter settling time
filter_settle_time = 0.0
filter_latency = 0.0

# Highpass filter the strain data, if this is a separate filter
if highpass_filt is not None:
	# FIXME: Might need to add logic if the highpass filter is at a lower SR than the strain data
	strain = calibration_parts.mkcomplexfirbank(pipeline, strain, latency = highpass_delay, fir_matrix = [highpass_filt[::-1]], time_domain = td)
	filter_settle_time += float(len(highpass_filt) - highpass_delay) / highpass_sr
	filter_latency += float(highpass_delay) / highpass_sr

if corr_filt is not None:
	# FIXME: Might need to add logic if the correction filter is at a lower sample rate than the strain data
	strain = calibration_parts.mkcomplexfirbank(pipeline, strain, latency = corr_delay, fir_matrix = [corr_filt[::-1]], time_domain = td)
	filter_settle_time += float(len(corr_filt) - corr_delay) / corr_sr
	filter_latency += float(corr_delay) / corr_sr

if monitor_latency:
	strain = pipeparts.mktee(pipeline, strain)
	strain_latency = calibration_parts.mkresample(pipeline, strain, 0, False, latency_metrics_cap)
	strain_latency = pipeparts.mklatency(pipeline, strain_latency, name = "corrected_strain", silent = silent)
	if kafka_server is not None:
		strain_latency.connect("notify::current-latency", handler.latency_new_buffer)
	pipeparts.mkfakesink(pipeline, strain_latency)

if test_filters:
	strain = pipeparts.mktee(pipeline, strain)
	correction = calibration_parts.mkinterleave(pipeline, [strain, calcsstraintee])
	correction_delay = filter_settle_time
	correction_start = start + correction_delay
	correction_dur = gps_end_time - correction_start - filter_settle_time - 10 if InputConfigs["datasource"] == "frames" else 300
	num_correction_ffts = int((correction_dur - 56) / 8)
	correction_dur = 56 + 8 * num_correction_ffts
	correction = pipeparts.mkprogressreport(pipeline, correction, "progress_correction_%s" % instrument)
	calibration_parts.mktransferfunction(pipeline, correction, fft_length = 64* hoft_sr, fft_overlap = 56 * hoft_sr, num_ffts = num_correction_ffts, use_median = True, update_samples = 1e15, update_dleay_samples = correction_delay * hoft_sr, filename = "%s_correction_filters_transfer_function_%d-%d.txt" % (filters_name.split('/')[-1].replace('.', '-'), correction_start, correction_dur), name = "correction", use_fir_fft = True, fft_window_type = 0, frequency_resolution = 0.25)

strain = pipeparts.mkprogressreport(pipeline, strain, "progress_hoft_%s" % instrument)

# Put the units back to strain before writing to frames
straintagstr = "units=strain,channel-name=%sCALIB_STRAIN%s,instrument=%s" % (chan_prefix, chan_suffix, instrument)
strain = pipeparts.mktaginject(pipeline, strain, straintagstr)

#
# CALIB_STATE_VECTOR BRANCH
#

# Bit definitions
# FIXME: Update these
hoft_ok_bitnum = 0
obs_intent_bitnum = 1
lownoise_bitnum = 2
filters_ok_bitnum = 3
no_gap_bitnum = 4
no_stoch_inj_bitnum = 5
no_cbc_inj_bitnum = 6
no_burst_inj_bitnum = 7
no_detchar_inj_bitnum = 8
ktst_smooth_bitnum = 9
kpum_smooth_bitnum = 10
kuim_smooth_bitnum = 11
kc_smooth_bitnum = 12
fcc_smooth_bitnum = 13
fs_smooth_bitnum = 14
Qinv_smooth_bitnum = 15
sus_line3_coh_bitnum = 16
sus_line2_coh_bitnum = 17
sus_line1_coh_bitnum = 18
pcal_line1_coh_bitnum = 19
pcal_line2_coh_bitnum = 20
pcal_line4_coh_bitnum = 21
D_epics_bitnum = 22
A_epics_bitnum = 23
C_epics_bitnum = 24
misc_epics_bitnum = 25
line_sub_bitnum = 26
noise_sub_bitnum = 27
noise_sub_gate_bitnum = 28

if compute_calib_statevector:

	# Read in the CALCS state vector channel
	if calcs_sv_channel_name is not None:
		calcssvchannel = calibration_parts.caps_and_progress(pipeline, head_dict["calcssv"], calcs_sv_caps, "calcs_sv_%s" % instrument)
		calcssvchannel = pipeparts.mkcapsfilter(pipeline, calcssvchannel, calibstate_caps)
	
	#
	# OBS-INTENT bit branch
	#

	obsintentchannel = calibration_parts.caps_and_progress(pipeline, head_dict["obsintent"], obsintent_caps, "obs_intent_%s" % instrument)
	obsintentchanneltee = pipeparts.mktee(pipeline, obsintentchannel)
	obsintent = pipeparts.mkgeneric(pipeline, obsintentchanneltee, "lal_logicalundersample", required_on = obsintent_bitmask, status_out = pow(2,obs_intent_bitnum))
	obsintent = pipeparts.mkcapsfilter(pipeline, obsintent, calibstate_caps)
	obsintenttee = pipeparts.mktee(pipeline, obsintent)

	#
	# Nominal low-noise bit branch
	#
	
	lownoisechanneltee = obsintentchanneltee if lownoise_channel_name == obsintent_channel_name else pipeparts.mktee(pipeline, calibration_parts.caps_and_progress(pipeline, head_dict["lownoise"], lownoise_caps, "low_noise_state_%s" % instrument))
	lownoise = pipeparts.mkgeneric(pipeline, lownoisechanneltee, "lal_logicalundersample", required_on = lownoise_bitmask, status_out = pow(2,lownoise_bitnum))
	lownoise = pipeparts.mkcapsfilter(pipeline, lownoise, calibstate_caps)
	lownoisetee = pipeparts.mktee(pipeline, lownoise)

	#
	# Filters-OK bit branch
	#
	# Set the FILTERS-OK bit based on step-before transition to GRD-IFO_OK.
	# Take in a channel list and a corresponding bitmask list to use as determinator for when to start the clock for filter settling time
	filterclock_channels = []
	for i in range(len(filterclock_channel_list)):
		for key in headkeys:
			for j in range(len(filterclock_channel_list[i])):
				if ((filterclock_channel_list[i][j] != obsintent_channel_name and filterclock_channel_list[i][j] != lownoise_channel_name) and key in filterclock_channel_list[i]):
					filterclock_channel = calibration_parts.caps_and_progress(pipeline, head_dict[key], filterclock_caps[i], key)
					filterclock_channel = calibration_parts.mkqueue(pipeline, pipeparts.mkcapsfilter(pipeline, pipeparts.mkgeneric(pipeline, filterclock_channel, "lal_logicalundersample", required_on = filterclock_bitmask_list[i], status_out = 1), calibstate_caps))
					filterclock_channels.append(filterclock_channel)
				elif (filterclock_channel_list[i][j] == obsintent_channel_name and key == "obsintent"):
					filterclock_channels.append(calibration_parts.mkqueue(pipeline, pipeparts.mkcapsfilter(pipeline, pipeparts.mkgeneric(pipeline, obsintenttee, "lal_logicalundersample", required_on = pow(2,obs_intent_bitnum), status_out = 1), calibstate_caps)))
				elif (filterclock_channel_list[i][j] == lownoise_channel_name and key == "lownoise"):
					filterclock_channels.append(calibration_parts.mkqueue(pipeline, pipeparts.mkcapsfilter(pipeline, pipeparts.mkgeneric(pipeline, lownoisetee, "lal_logicalundersample", required_on = pow(2,lownoise_bitnum), status_out = 1), calibstate_caps)))
	if len(filterclock_channel_list) > 1:
		filtersok = pipeparts.mkadder(pipeline, tuple(filterclock_channels))
	else:
		filtersok = filterclock_channels[0]
	lownoise_gate = pipeparts.mkbitvectorgen(pipeline, lownoisetee, bit_vector = len(filterclock_channels), threshold = pow(2,lownoise_bitnum))
	lownoise_gate = pipeparts.mkcapsfilter(pipeline, lownoise_gate, calibstate_caps)
	filtersok = pipeparts.mkadder(pipeline, calibration_parts.list_srcs(lownoise_gate, filtersok))
	filtersok = pipeparts.mkbitvectorgen(pipeline, filtersok, bit_vector = 1, threshold=len(filterclock_channels))
	filtersok = pipeparts.mkcapsfilter(pipeline, filtersok, calibstate_caps)
	filtersoktee = pipeparts.mktee(pipeline, filtersok)
	filtersok = calibration_parts.mkgate(pipeline, filtersoktee, filtersoktee, 1, attack_length = -int(filter_settle_time * calibstate_sr))
	# The "hold" on FILTERS_OK turning off is still determined by the low noise state
	filtersok = calibration_parts.mkgate(pipeline, filtersok, lownoisetee, pow(2,lownoise_bitnum), hold_length = -int(filter_latency * calibstate_sr))
	filtersok = pipeparts.mkbitvectorgen(pipeline, filtersok, bit_vector = pow(2,filters_ok_bitnum), nongap_is_control = True)
	filtersok = pipeparts.mkcapsfilter(pipeline, filtersok, calibstate_caps)

	#
	# No-invalid-input branch
	#

	# Check if the DQ state vector is present
	nogap = pipeparts.mkbitvectorgen(pipeline, obsintentchanneltee, threshold=1, bit_vector = 1)
	nogap = pipeparts.mkcapsfilter(pipeline, nogap, obsintent_caps)
	nogap = pipeparts.mkgeneric(pipeline, nogap, "lal_logicalundersample", required_on = 1, status_out = 1)
	nogap = pipeparts.mkcapsfilter(pipeline, nogap, calibstate_caps)
	# Check if any of the input data channels had to be replaced by zeroes because they were < 1e-35
	calcsstrainok = pipeparts.mkbitvectorgen(pipeline, calcsstraintee, threshold=1e-35, bit_vector=1)
	calcsstrainok = pipeparts.mkcapsfilter(pipeline, calcsstrainok, "audio/x-raw, format=U32LE, rate=%d" % hoft_sr)
	calcsstrainok = pipeparts.mkgeneric(pipeline, calcsstrainok, "lal_logicalundersample", required_on = 1, status_out = 1)
	calcsstrainok = pipeparts.mkcapsfilter(pipeline, calcsstrainok, calibstate_caps)
	noinvalidinput = calibration_parts.mkadder(pipeline, calibration_parts.list_srcs(pipeline, nogap, calcsstrainok))
	noinvalidinput = pipeparts.mkbitvectorgen(pipeline, noinvalidinput, threshold=3, bit_vector=pow(2, no_gap_bitnum))
	noinvalidinput = pipeparts.mkcapsfilter(pipeline, noinvalidinput, calibstate_caps)
	noinvalidinput = pipeparts.mktee(pipeline, noinvalidinput)
	 # inputs that are replaced with zeros affect h(t) for a short time before and after the zeros, so we also must account for this corrupted time.
	noinvalidinput = calibration_parts.mkgate(pipeline, noinvalidinput, noinvalidinput, pow(2, no_gap_bitnum), attack_length = -int(filter_settle_time * calibstate_sr), hold_length = -int(filter_latency * calibstate_sr))

	#
	# h(t)-OK bit branch
	#

	# First combine higher order bits to determine h(t)-OK
	higherbits_list = [filtersok, lownoisetee, noinvalidinput]
	htok_threshold = pow(2,lownoise_bitnum) + pow(2,filters_ok_bitnum) + pow(2,no_gap_bitnum)
	# FIXME: Add in the logic from the CALCS sv here that is needed to determine top-level h(t)-ok bit
	higherbits = calibration_parts.mkadder(pipeline, tuple(higherbits_list))
	higherbitstee = pipeparts.mktee(pipeline, higherbits)
	# Now calculate h(t)-OK bit
	htok = pipeparts.mkbitvectorgen(pipeline, higherbitstee, bit_vector = 1, threshold = htok_threshold)
	htok = pipeparts.mkcapsfilter(pipeline, htok, calibstate_caps)

	#
	# HW injection bits
	#

	hwinjchanneltee = obsintentchanneltee if hwinj_channel_name == obsintent_channel_name else lownoisechanneltee if hwinj_channel_name == lownoise_channel_name else pipeparts.mktee(pipeline, calibration_parts.caps_and_progress(pipeline, head_dict["hwinj"], hwinj_caps, "HW_injections_%s" % instrument))

	if cbchwinj_bitmask is not None:
		 hwinjcbc = pipeparts.mkgeneric(pipeline, hwinjchanneltee, "lal_logicalundersample", required_on = cbchwinj_bitmask, status_out = pow(2,no_cbc_inj_bitnum))
	elif cbchwinj_offbitmask is not None:
		hwinjcbc = pipeparts.mkgeneric(pipeline, hwinjchanneltee, "lal_logicalundersample", required_on = cbchwinj_offbitmask, invert_result = True, status_out = pow(2,no_cbc_inj_bitnum))
	else:
		raise ValueError("Must set either CBCHWInjBitmask or CBCHWInjOffBitmask")
	hwinjcbc = pipeparts.mkcapsfilter(pipeline, hwinjcbc, calibstate_caps)
	if bursthwinj_bitmask is not None:
		hwinjburst = pipeparts.mkgeneric(pipeline, hwinjchanneltee, "lal_logicalundersample", required_on = bursthwinj_bitmask, status_out = pow(2,no_burst_inj_bitnum))
	elif bursthwinj_offbitmask is not None:
		hwinjburst = pipeparts.mkgeneric(pipeline, hwinjchanneltee, "lal_logicalundersample", required_on = bursthwinj_offbitmask, invert_result = True, status_out = pow(2,no_burst_inj_bitnum))
	else:
		raise ValueError("Must set either BurstHWInjBitmask or BurstHWInjOffBitmask")
	hwinjburst = pipeparts.mkcapsfilter(pipeline, hwinjburst, calibstate_caps)

	if detcharhwinj_bitmask is not None:
		hwinjdetchar = pipeparts.mkgeneric(pipeline, hwinjchanneltee, "lal_logicalundersample", required_on = detcharhwinj_bitmask, status_out = pow(2,no_detchar_inj_bitnum))
	elif detcharhwinj_offbitmask is not None:
		hwinjdetchar = pipeparts.mkgeneric(pipeline, hwinjchanneltee, "lal_logicalundersample",  required_on = detcharhwinj_offbitmask, invert_result = True, status_out = pow(2,no_detchar_inj_bitnum))
	else:
		raise ValueError("Must set either DetCharHWInjBitmask or DetcharHWInjOffBitmask")
	hwinjdetchar = pipeparts.mkcapsfilter(pipeline, hwinjdetchar, calibstate_caps)

	if stochhwinj_bitmask is not None:
		hwinjstoch = pipeparts.mkgeneric(pipeline, hwinjchanneltee, "lal_logicalundersample", required_on = stochhwinj_bitmask, status_out = pow(2,no_stoch_inj_bitnum))
	elif stochhwinj_offbitmask is not None:
		hwinjstoch = pipeparts.mkgeneric(pipeline, hwinjchanneltee, "lal_logicalundersample",  required_on = stochhwinj_offbitmask, invert_result = True, status_out = pow(2,no_stoch_inj_bitnum))
	else:
		raise ValueError("Must set either StochHWInjBitmask or StochHWInjOffBitmask")
	hwinjstoch = pipeparts.mkcapsfilter(pipeline, hwinjstoch, calibstate_caps)

	#
	# Combine all bits to make CALIB_STATE_VECTOR channel
	#

	# FIXME: Just adding the CALCSSV here as is, but this may not be how it gets combined 
	all_bits_list = [higherbitstee, obsintenttee, htok, hwinjcbc, hwinjburst, hwinjdetchar, hwinjstoch, calcssv] if calcs_sv_channel_name is not None else [higherbitstee, obsintenttee, htok, hwinjcbc, hwinjburst, hwinjdetchar, hwinjstoch]
	calibstatevector = calibration_parts.mkadder(pipeline, tuple(all_bits_list))

#
# CREATE MUXER AND HOOK EVERYTHING UP TO IT
#

channelmux_input_dict = {}
# Link the output DQ vectors up to the muxer, if applicable
if compute_calib_statevector:
	channelmux_input_dict["%s:%sCALIB_STATE_VECTOR%s" % (instrument, chan_prefix, chan_suffix)] = calibration_parts.mkqueue(pipeline, calibstatevector)
# Link the strain branch to the muxer
channelmux_input_dict["%s:%sCALIB_STRAIN%s" % (instrument, chan_prefix, chan_suffix)] = calibration_parts.mkqueue(pipeline, strain)

# FIXME: Pass along all of the channels we also want to save to frames from the CALCS model (maybe none?)

mux = pipeparts.mkframecppchannelmux(pipeline, channelmux_input_dict, frame_duration = options.frame_duration, frames_per_file = options.frames_per_file, compression_scheme = int(OutputConfigs["compressionscheme"]), compression_level = int(OutputConfigs["compressionlevel"]))

# Check that all frames are long enough, that they have all of the channels by requiring a certain amount of time from start-up, and that frames aren't written for times requested by the wings option
def check_complete_frames(pad, info, output_start, frame_duration, wings_start, wings_end):
	if verbose:
		print("Checking if frames are complete")
	buf = info.get_buffer()
	startts = lal.LIGOTimeGPS(0, buf.pts)
	duration = lal.LIGOTimeGPS(0, buf.duration)
	if not (startts % frame_duration == 0):
		if verbose:
			print("Dropping frame because it is not an integer multiple of frame duration")
		return Gst.PadProbeReturn.DROP
	if startts < output_start:
		if verbose:
			print("Dropping frame because start time %f is less than desired output start time %f" % (startts, output_start))
		return Gst.PadProbeReturn.DROP
	if duration != frame_duration:
		if verbose:
			print("Dropping frame because the duration %d is not equal to the required frame duration %d" % (duration, frame_duration))
		return Gst.PadProbeReturn.DROP
	if wings_start is not None and wings_end is not None:
		if startts < wings_start or (startts+duration) > wings_end:
			if verbose:
				print("Dropping frame because it lies outside of the wings time")
			return Gst.PadProbeReturn.DROP
	return Gst.PadProbeReturn.OK

# start time of first frame file is the desired start time + filter latency
output_start = start + int(filter_settle_time)

# If the wings option is set, need to also check that frames aren't written during the requested wing time
wings = int(options.wings)
if wings != 0:
	wings_start = gps_start_time + wings
	wings_end = gps_end_time - wings
	mux.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER, check_complete_frames, lal.LIGOTimeGPS(output_start,0), lal.LIGOTimeGPS(options.frame_duration*options.frames_per_file,0), lal.LIGOTimeGPS(wings_start, 0), lal.LIGOTimeGPS(wings_end, 0))
else:
	mux.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER, check_complete_frames, lal.LIGOTimeGPS(output_start,0), lal.LIGOTimeGPS(options.frame_duration*options.frames_per_file,0), None, None)

mux = pipeparts.mkprogressreport(pipeline, mux, "progress_sink_%s" % instrument)

if monitor_latency:
	mux = pipeparts.mktee(pipeline, mux)
	mux_latency = pipeparts.mklatency(pipeline, mux, name = "%s_sink" % OutputConfigs["frametype"], silent = silent)
	if kafka_server is not None:
		mux_latency.connect("notify::current-latency", handler.latency_new_buffer)
	pipeparts.mkfakesink(pipeline, mux_latency)

if OutputConfigs["datasink"] == "lvshm":
	pipeparts.mkgeneric(pipeline, mux, "gds_lvshmsink", sync=False, async_=False, shm_name = OutputConfigs["outputshmpartition"], num_buffers = int(OutputConfigs["numbuffers"]), blocksize = int(OutputConfigs["framesize"])*options.frame_duration*options.frames_per_file, buffer_mode = int(OutputConfigs["buffermode"]))
elif OutputConfigs["datasink"] == "frames":
	pipeparts.mkframecppfilesink(pipeline, mux, frame_type = OutputConfigs["frametype"], path = options.output_path, instrument = instrument)

# Run pipeline

if DebuggingConfigs["pipelinegraphfilename"] != "None":
	pipeparts.write_dump_dot(pipeline, "%s.%s" %(DebuggingConfigs["pipelinegraphfilename"], "NULL"), verbose = verbose)

# Seek the pipeline when necessary
if InputConfigs["datasource"] == "frames":
	if verbose:
		print("seeking GPS start and stop times ...", file=sys.stderr)
	if pipeline.set_state(Gst.State.READY) != Gst.StateChangeReturn.SUCCESS:
		raise RuntimeError("pipeline failed to enter READY state")
	datasource.pipeline_seek_for_gps(pipeline, gps_start_time, gps_end_time)

if verbose:
	print("setting pipeline state to playing ...", file=sys.stderr)
if pipeline.set_state(Gst.State.PLAYING) != Gst.StateChangeReturn.SUCCESS:
	raise RuntimeError("pipeline failed to enter PLAYING state")
else:
	if verbose:
		print("set to playing successfully")
if DebuggingConfigs["pipelinegraphfilename"] != "None":
	pipeparts.write_dump_dot(pipeline, "%s.%s" %(DebuggingConfigs["pipelinegraphfilename"], "PLAYING"), verbose = verbose)

if verbose:
	print("running pipeline ...", file=sys.stderr)

mainloop.run()

if pipeline.set_state(Gst.State.NULL) != Gst.StateChangeReturn.SUCCESS:
	raise RuntimeError("pipeline could not be set to NULL")
