抗体価(Endpoint titer)を計算する
実現したこと
生体サンプル中の抗体価を算出することは特定の分野でとても重要になって来ます。
1つの手法としてエンドポイントタイターとして算出することがよく用いられます。
よくあるのは、サンプルを希釈していき、ある吸光度を下回るときの希釈倍率をエンドポイントタイターとする方法です。
カットオフは、私はブランクの吸光度の平均値の2倍としてよく計算しています。
この計算をサクッとおこなうPythonスクリプトを紹介します。
スクリプトの概要
基本機能
ELISAアッセイの吸光度データを解析
4PLおよび5PLロジスティック回帰によるフィッティング
抗体価の自動計算
グラフの自動生成(matplotlib使用)
入力データ対応
Excelファイル (.xlsx) もしくはCSVファイルからのデータ読み込み
複数サンプルの一括処理
希釈倍率の自動計算(数式対応)
解析機能
4PL/5PL自動選択機能(AICベース)
フィッティング品質の評価(R², Adjusted R², RMSE)
異常値や無効データの自動検出
カットオフ値に基づく抗体価計算
出力機能
結果のExcelファイル出力
サンプルごとのグラフ生成(PNG形式)
詳細なログ出力(verboseモード)
フィッティング統計値のレポート
グラフ機能
フィッティングカーブの描画
実測値のプロット
カットオフラインの表示
抗体価の視覚的表示
環境構築
Conda仮想環境構築
nano endpoint-titer.yml
name: endpoint-titer-analysis
channels:
- conda-forge
- defaults
dependencies:
- python=3.12
- numpy
- pandas
- scipy
- openpyxl
- matplotlib
# 環境の作成
conda env create -f endpoint-titer.yml
# 環境の有効化
conda activate endpoint-titer-analysis
スクリプト
データファイルは下記のような構造です。
import openpyxl
import pandas as pd
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from matplotlib import font_manager
from scipy.optimize import curve_fit
from openpyxl.drawing.image import Image
import io
import argparse
from pathlib import Path
import sys
import platform
import os
# Function to set the appropriate font based on the OS
def set_japanese_font():
system_os = platform.system()
if system_os == "Darwin": # macOS
jp_font_path = "/System/Library/Fonts/ヒラギノ角ゴシック W6.ttc" # Hiragino Sans
elif system_os == "Windows": # Windows
jp_font_path = "C:/Windows/Fonts/msgothic.ttc" # MS Gothic
else:
raise EnvironmentError("Unsupported operating system for this script")
# Load and set the font
jp_font = font_manager.FontProperties(fname=jp_font_path)
plt.rcParams['font.family'] = jp_font.get_name()
return jp_font
def four_pl(x, A, B, C, D):
"""4-Parameter Logistic Regression"""
return D + (A-D)/(1.0+((x/C)**B))
def five_pl(x, A, B, C, D, E):
"""5-Parameter Logistic Regression"""
return D + (A-D)/(1.0+((x/C)**B))**E
def load_data(file_path, sheet_name='Sheet1', encoding='utf-8'):
"""
Function to load data from Excel or CSV files
Parameters:
-----------
file_path : str or Path
Input file path (Excel or CSV)
sheet_name : str
Sheet name for Excel files
encoding : str
CSV file encoding (default: utf-8)
Returns:
--------
pd.DataFrame
Loaded data
"""
file_path = Path(file_path)
file_extension = file_path.suffix.lower()
if file_extension in ['.xlsx', '.xls']:
# Load Excel file
wb = openpyxl.load_workbook(file_path)
sheet = wb[sheet_name]
data = []
for row in sheet.iter_rows(values_only=True):
data.append(row)
df = pd.DataFrame(data)
elif file_extension == '.csv':
# Load CSV file
try:
df = pd.read_csv(file_path, header=None, encoding=encoding)
except UnicodeDecodeError:
# Try with CP932 (Windows Japanese) if UTF-8 fails
if encoding == 'utf-8':
df = pd.read_csv(file_path, header=None, encoding='cp932')
else:
raise
# Format CSV data to match Excel format
if len(df.columns) < 13:
raise ValueError("CSV file must have at least 13 columns (sample name + 12 data points")
else:
raise ValueError(f"Unsupported file format: {file_extension}")
return df
def evaluate_dilution_rates(dilution_rates):
"""Evaluate and convert dilution rates to numerical values"""
evaluated_rates = []
for i, rate in enumerate(dilution_rates):
if isinstance(rate, (int, float)):
evaluated_rates.append(float(rate))
elif isinstance(rate, str):
if rate.startswith('='):
parts = rate.split('*')
if len(parts) == 2 and parts[1].isdigit():
if i == 0:
evaluated_rates.append(float(parts[1]))
else:
evaluated_rates.append(evaluated_rates[-1] * int(parts[1]))
else:
try:
evaluated_rates.append(float(eval(rate[1:])))
except:
print(f"Warning: Could not evaluate value '{rate}'")
return None
else:
try:
evaluated_rates.append(float(rate))
except ValueError:
print(f"Warning: Could not convert value '{rate}' to number")
return None
else:
print(f"Warning: Unknown type of value '{rate}'")
return None
return evaluated_rates
def calculate_fit_metrics(y_true, y_pred, n_params):
"""Calculate fitting quality metrics"""
n = len(y_true)
residuals = y_true - y_pred
rss = np.sum(residuals**2)
tss = np.sum((y_true - np.mean(y_true))**2)
r2 = 1 - (rss/tss)
adj_r2 = 1 - ((1-r2)*(n-1)/(n-n_params-1))
aic = n * np.log(rss/n) + 2 * n_params
bic = n * np.log(rss/n) + n_params * np.log(n)
rmse = np.sqrt(np.mean(residuals**2))
return {
'R2': r2,
'Adjusted_R2': adj_r2,
'AIC': aic,
'BIC': bic,
'RMSE': rmse
}
def get_initial_params(y_data, dilution_rates):
"""Optimized initial parameter estimation"""
A_init = np.max(y_data) * 1.05
D_init = np.min(y_data) * 0.95
B_init = 1.0
mid_response = (A_init + D_init) / 2
closest_idx = np.argmin(np.abs(y_data - mid_response))
C_init = dilution_rates[closest_idx]
E_init = 1.0
return {
'A': A_init,
'B': B_init,
'C': C_init,
'D': D_init,
'E': E_init
}
def fit_curve(x_data, y_data, method, init_params, verbose=False):
"""Execute and evaluate curve fitting"""
try:
if method == '4':
bounds = ([0, 0.5, 0, 0], [np.inf, 10, np.inf, np.inf])
p0 = [init_params['A'], init_params['B'], init_params['C'], init_params['D']]
popt, pcov = curve_fit(four_pl, x_data, y_data, p0=p0, bounds=bounds, maxfev=50000)
y_fit = four_pl(x_data, *popt)
n_params = 4
else:
bounds = ([0, 0.5, 0, 0, 0.5], [np.inf, 10, np.inf, np.inf, 5])
p0 = [init_params['A'], init_params['B'], init_params['C'], init_params['D'], init_params['E']]
popt, pcov = curve_fit(five_pl, x_data, y_data, p0=p0, bounds=bounds, maxfev=50000)
y_fit = five_pl(x_data, *popt)
n_params = 5
metrics = calculate_fit_metrics(y_data, y_fit, n_params)
if verbose:
print("\nFitting Results:")
print(f" R² = {metrics['R2']:.4f}")
print(f" Adjusted R² = {metrics['Adjusted_R2']:.4f}")
print(f" RMSE = {metrics['RMSE']:.4e}")
if metrics['R2'] < 0.99:
print(" Warning: R² is below 0.99. Please check fitting quality.")
return popt, pcov, metrics, y_fit
except RuntimeError as e:
raise RuntimeError(f"Fitting failed: {str(e)}")
def process_data_and_calculate_titer(file_path, sheet_name, output_path, cutoff, method, replicates=2, verbose=False, log_path=None, encoding='utf-8'):
"""
Process ELISA data and calculate titers
Parameters:
-----------
file_path : str
Path to input file (Excel or CSV)
sheet_name : str
Sheet name for Excel files
output_path : str
Path to output Excel file
cutoff : float
Cutoff value
method : str
Fitting method ('4', '5', 'auto')
replicates : int
Number of technical replicates (1: single, 2: duplicate)
verbose : bool
Whether to display detailed output
log_path : str, optional
Path to log file
encoding : str
Encoding for CSV files (default: utf-8)
"""
# Open log file
log_file = open(log_path, 'w', encoding='utf-8') if log_path and verbose else None
def log_print(*args, **kwargs):
if verbose:
print(*args, **kwargs)
if log_file:
output = ' '.join(str(arg) for arg in args)
if 'end' in kwargs:
output += kwargs['end']
else:
output += '\n'
log_file.write(output)
log_file.flush()
try:
if verbose:
log_print(f"Processing started: {file_path}")
log_print(f"File format: {Path(file_path).suffix.lower()}")
log_print(f"Method: {method}PL fitting")
log_print(f"Cutoff value: {cutoff}")
log_print(f"Number of technical replicates: {replicates}")
# Load data
df = load_data(file_path, sheet_name, encoding)
# Create output Excel file
output_wb = openpyxl.Workbook()
results_sheet = output_wb.active
results_sheet.title = "Results"
plots_sheet = output_wb.create_sheet("Plots")
if verbose:
log_print("\nData loading details:")
log_print(f"Total rows: {len(df)}")
log_print("First few rows:")
log_print(df.head())
# Get dilution rates from first row
dilution_rates = df.iloc[0, 1:13].values
if verbose:
log_print(f"\nFound dilution rates: {dilution_rates}")
evaluated_rates = evaluate_dilution_rates(dilution_rates)
if evaluated_rates is None:
raise ValueError("Dilution rate data contains invalid values.")
dilution_rates = np.array(evaluated_rates, dtype=float)
if verbose:
log_print(f"Evaluated dilution rates: {dilution_rates}")
# Sample data starts from third row
sample_names = df.iloc[2:, 0].values
df_data = df.iloc[2:, 1:13]
# Detect data blocks (based on number of replicates)
blocks = []
row = 0
while row < len(df_data):
try:
row_data = pd.to_numeric(df_data.iloc[row], errors='coerce')
if not row_data.isna().all() and row + (8 * replicates // 2) < len(df_data):
blocks.append((row, row + (8 * replicates // 2) - 1))
row += 8 * replicates // 2
else:
row += 1
except Exception as e:
if verbose:
log_print(f"Error while processing row {row}: {str(e)}")
row += 1
if verbose:
log_print(f"\nDetected data blocks:")
for i, (start, end) in enumerate(blocks):
log_print(f"Block {i+1}: rows {start+1} to {end+1}")
if not blocks:
raise ValueError("No valid data blocks found.")
results_df = pd.DataFrame(columns=[
'Sample', 'Titer', 'R2', 'Adjusted_R2', 'RMSE', 'Fitting_Method'
])
for block_idx, (start_row, end_row) in enumerate(blocks):
if verbose:
log_print(f"\nStarting processing of block {block_idx+1}:")
log_print(f"Row range: {start_row+1} to {end_row+1}")
block_data = df_data.iloc[start_row:end_row+1]
# Adjust data processing based on number of replicates
for sample_idx in range(0, end_row - start_row + 1, replicates):
try:
# Get replicate data
replicate_data = block_data.iloc[sample_idx:sample_idx+replicates]
replicate_numeric = replicate_data.apply(pd.to_numeric, errors='coerce')
y_data = replicate_numeric.mean().values
if verbose:
log_print(f"\n Processing sample {sample_idx//replicates + 1}:")
log_print(f" Data: {y_data}")
if np.isnan(y_data).any():
log_print(f"Warning: Sample {sample_idx//replicates + 1} contains invalid data")
continue
sample_name = sample_names[start_row + sample_idx]
if verbose:
log_print(f"Processing sample: {sample_name}")
process_rows = [start_row + sample_idx + i + 1 for i in range(replicates)]
log_print(f"Using data: average of rows {', '.join(map(str, process_rows))}")
init_params = get_initial_params(y_data, dilution_rates)
try:
if method == '4':
popt, pcov, metrics, y_fit = fit_curve(
dilution_rates, y_data, '4', init_params, verbose
)
final_method = '4'
elif method == '5':
popt, pcov, metrics, y_fit = fit_curve(
dilution_rates, y_data, '5', init_params, verbose
)
final_method = '5'
else:
metrics_4pl = None
metrics_5pl = None
try:
popt_4pl, _, metrics_4pl, y_fit_4pl = fit_curve(
dilution_rates, y_data, '4', init_params, verbose
)
except RuntimeError:
if verbose:
log_print("4PL fitting failed")
try:
popt_5pl, _, metrics_5pl, y_fit_5pl = fit_curve(
dilution_rates, y_data, '5', init_params, verbose
)
except RuntimeError:
if verbose:
log_print("5PL fitting failed")
if metrics_4pl and metrics_5pl:
if metrics_4pl['AIC'] < metrics_5pl['AIC']:
popt, metrics, y_fit = popt_4pl, metrics_4pl, y_fit_4pl
final_method = '4'
else:
popt, metrics, y_fit = popt_5pl, metrics_5pl, y_fit_5pl
final_method = '5'
elif metrics_4pl:
popt, metrics, y_fit = popt_4pl, metrics_4pl, y_fit_4pl
final_method = '4'
elif metrics_5pl:
popt, metrics, y_fit = popt_5pl, metrics_5pl, y_fit_5pl
final_method = '5'
else:
raise RuntimeError("Both fitting methods failed")
titer = np.interp(cutoff, y_fit[::-1], dilution_rates[::-1])
new_row = pd.DataFrame([{
'Sample': sample_name,
'Titer': titer,
'R2': metrics['R2'],
'Adjusted_R2': metrics['Adjusted_R2'],
'RMSE': metrics['RMSE'],
'Fitting_Method': f'{final_method}PL'
}])
results_df = pd.concat([results_df, new_row], ignore_index=True)
# Set the appropriate font before plotting
jp_font = set_japanese_font()
# Plot using the same sample name
replicate_numeric = replicate_data.apply(pd.to_numeric, errors='coerce')
y_data = replicate_numeric.mean().values
y_errors = replicate_numeric.sem().values if replicates > 1 else np.zeros_like(y_data)
plt.figure(figsize=(10, 6))
plt.errorbar(dilution_rates, y_data,
yerr=y_errors,
fmt='o', label='Measured values',
capsize=5)
plt.semilogx(dilution_rates, y_fit, '-', label='Fitting curve')
plt.axhline(y=cutoff, color='r', linestyle='--', label='Cutoff')
plt.axvline(x=titer, color='g', linestyle='--', label='Antibody titer')
plt.xlabel('Dilution rate')
plt.ylabel('Absorbance')
plt.title(f'{sample_name} ({final_method}PL fitting)')
plt.legend()
plt.grid(True)
# Save plot to memory
img_buffer = io.BytesIO()
plt.savefig(img_buffer, format='png', dpi=300)
# Save as individual PNG file
plot_dir = Path(output_path).parent / 'plots'
if not os.path.exists(plot_dir):
os.makedirs(plot_dir)
plot_path = plot_dir / f'{sample_name}_plot.png'
plt.savefig(plot_path, dpi=300, bbox_inches='tight')
plt.close()
# Place plot in Excel
img = Image(img_buffer)
img.width = 600
img.height = 400
plots_sheet.cell(row=row_position-1, column=1, value=sample_name)
plots_sheet.add_image(img, f'A{row_position}')
if verbose:
log_print(f"Plot placement: Sample {sample_name} placed at row {row_position}")
except Exception as e:
log_print(f"Warning: Error during fitting for {sample_name}: {str(e)}")
except Exception as e:
log_print(f"Warning: Error processing block {block_idx+1}, pair {pair_idx//2+1}: {str(e)}")
# Write results to Results sheet
for i, col in enumerate(results_df.columns):
results_sheet.cell(row=1, column=i+1, value=col)
for i, row in results_df.iterrows():
for j, value in enumerate(row):
results_sheet.cell(row=i+2, column=j+1, value=value)
# Adjust column widths
for column in results_sheet.columns:
max_length = 0
column = [cell for cell in column]
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = (max_length + 2)
results_sheet.column_dimensions[column[0].column_letter].width = adjusted_width
# Save workbook and return results
output_wb.save(output_path)
return len(results_df)
except Exception as e:
raise Exception(f"Error occurred during processing: {str(e)}")
finally:
if log_file:
log_file.close()
def parse_args():
parser = argparse.ArgumentParser(
description='ELISA Analysis Tool - Optimized Version',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Input file formats:
- Excel (.xlsx, .xls)
- CSV (.csv)
Note:
- Both formats require dilution rates in row 1 and data from row 3 onwards
- CSV files must have at least 13 columns (sample name + 12 data points)
- Output is always in Excel (.xlsx) format"""
)
parser.add_argument('--input', '-i', required=True,
help='Input file (Excel or CSV)')
parser.add_argument('--cutoff', '-c', type=float, required=True,
help='Cutoff value')
parser.add_argument('--method', '-m', choices=['4', '5', 'auto'],
default='auto',
help='Fitting method (4: 4PL, 5: 5PL, auto: automatic selection)')
parser.add_argument('--replicates', '-r', type=int, choices=[1, 2],
default=2,
help='Number of technical replicates (1: single, 2: duplicate)')
parser.add_argument('--verbose', '-v', action='store_true',
help='Display detailed output')
parser.add_argument('--encoding', '-e', default='utf-8',
help='CSV file encoding (default: utf-8)')
return parser.parse_args()
def main():
args = parse_args()
try:
input_path = Path(args.input)
# Check input file existence
if not input_path.exists():
print(f"Error: File '{input_path}' not found", file=sys.stderr)
return 1
# Check file format
if input_path.suffix.lower() not in ['.xlsx', '.xls', '.csv']:
print(f"Error: Unsupported file format: {input_path.suffix}", file=sys.stderr)
print("Supported formats: .xlsx, .xls, .csv", file=sys.stderr)
return 1
# Generate output filename (output in xlsx format even for CSV input)
output_path = input_path.parent / f'results_{input_path.stem}.xlsx'
# Set log file path
log_path = None
if args.verbose:
log_path = input_path.parent / f'analysis_log_{input_path.stem}.txt'
# Set sheet name based on file format
sheet_name = 'Sheet1' if input_path.suffix.lower() in ['.xlsx', '.xls'] else None
try:
num_samples = process_data_and_calculate_titer(
args.input,
sheet_name,
output_path,
args.cutoff,
args.method,
args.replicates,
args.verbose,
log_path,
encoding=args.encoding
)
print(f"Processing complete: Analyzed {num_samples} samples")
print(f"Results saved to {output_path}")
if args.verbose:
print(f"Analysis log saved to {log_path}")
except UnicodeDecodeError as e:
print(f"Encoding error: {str(e)}", file=sys.stderr)
print("Please specify appropriate encoding using --encoding option", file=sys.stderr)
print("Example: -e shift-jis or -e cp932", file=sys.stderr)
return 1
except ValueError as e:
print(f"Error: {str(e)}", file=sys.stderr)
return 1
except Exception as e:
print(f"Unexpected error occurred: {str(e)}", file=sys.stderr)
if args.verbose:
import traceback
traceback.print_exc()
return 1
except Exception as e:
print(f"Error occurred: {str(e)}", file=sys.stderr)
if args.verbose:
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == '__main__':
exit(main())
使用例
usage: Endpoint_titer.py [-h] --input INPUT --cutoff CUTOFF [--method {4,5,auto}] [--replicates {1,2}] [--verbose]
ELISA解析ツール - 最適化版
options:
-h, --help show this help message and exit
--input INPUT, -i INPUT
入力Excelファイル
--cutoff CUTOFF, -c CUTOFF
カットオフ値
--method {4,5,auto}, -m {4,5,auto}
フィッティング方法 (4: 4PL, 5: 5PL, auto: 自動選択)
--replicates {1,2}, -r {1,2}
テクニカルレプリケート数 (1: シングル, 2: デュプリケート)
--verbose, -v 詳細な出力を表示
# 最もシンプルな使用法
python endpoint_titer.py --input data.xlsx --cutoff 0.5
# フィッティング方法を指定
python endpoint_titer.py --input data.xlsx --cutoff 0.5 --method 4 # 4PLフィッティング
python endpoint_titer.py --input data.xlsx --cutoff 0.5 --method 5 # 5PLフィッティング
python endpoint_titer.py --input data.xlsx --cutoff 0.5 --method auto # 自動選択
# 詳細なログ出力を有効化
python elisa_analysis.py --input data.xlsx --cutoff 0.5 --verbose
まとめ
これで、抗体価の計算が簡単に一瞬で終わるようになりました。
これで教育・研究活動に時間が使えるようになりハッピーになると良いですね。
他にも色々と解析に使えるスクリプトがありますので、都度紹介します!
質問や改善点があれば、ぜひコメントで教えてください。