There is a phenomenon in financial markets that sits at the intersection of behavioral finance, market microstructure, and Islamic finance: herding. It's what happens when investors, individually rational, collectively abandon their own analysis and follow what other market participants are doing. Prices move not because fundamentals change, but because the act of moving attracts more movement.
Herding matters for Islamic equity markets for a reason that isn't obvious from the outside. The Shariah screening process the 33% debt threshold, the sector exclusions, the revenue ratio limits doesn't just filter assets. It filters investors. The pool of active participants in Malaysian Islamic equities is more homogeneous than the broader market: Islamic fund managers operate under the same SC-approved stock list, make portfolio decisions at similar times (quarterly rebalancing around SC list updates), and face the same constraints on what they can buy and sell. This structural homogeneity makes herding more likely, more intense when it occurs, and more detectable from price data.
The academic paper "Machine Learning vs. Human Investors" deploys neural networks and sentiment analysis to detect these patterns in US stocks versus Malaysian Shariah-compliant stocks. The key finding: herding is present in both markets, but the detection accuracy of ML models significantly exceeds classical regression in the Malaysian Islamic equity context suggesting the herding patterns are nonlinear and temporally structured in ways that OLS can't capture.
I want to build the full framework here from the classical CSAD herding metric, through LSTM detection, to a sentiment-augmented trading signal, to a backtested strategy that respects Islamic trading constraints. This is one of the more technically demanding posts I've written, so let me break it into stages.
Part 1: What Is Herding and How Do We Measure It?
Herding in equity markets means investors are making correlated decisions buying and selling together beyond what is explained by correlated information or fundamentals. The canonical measurement approach is the Cross-Sectional Absolute Deviation (CSAD) of returns, developed by Chang, Cheng, and Khorana (2000).
The intuition: if investors are acting independently on their own analysis, individual stock returns should disperse more when the market moves more (because stocks respond differently to news). If investors are herding, that dispersion should flatten or even decrease during large market moves, because everyone is doing the same thing regardless of fundamentals.
Define the market return on day t as the equal-weighted average return across N stocks:
Rₘ,t = (1/N) · Σᵢ Rᵢ,t
The CSAD on day t is the average absolute deviation of individual returns from the market:
CSAD_t = (1/N) · Σᵢ |Rᵢ,t - Rₘ,t|
Under the rational asset pricing model (CAPM-style), CSAD should be an increasing, linear function of |Rₘ,t|. Under herding, this relationship becomes sublinear CSAD grows slower than |Rₘ,t|, and in extreme herding, it may even decrease.
The Chang-Cheng-Khorana (CCK) regression tests for herding:
CSAD_t = α + β₁·|Rₘ,t| + β₂·Rₘ,t² + εₜ
If β₂ < 0 and statistically significant, herding is present. The nonlinear term Rₘ,t² captures the suppression of cross-sectional dispersion during large market moves.
Part 2: Core Data Structures and CSAD Engine
import polars as pl
import numpy as np
from dataclasses import dataclass, field
from typing import NamedTuple
from scipy import stats
import warnings
warnings.filterwarnings("ignore")
@dataclass(frozen=True)
class HerdingConfig:
"""Configuration for the herding detection pipeline."""
min_stocks: int = 20 # minimum stocks for valid CSAD
rolling_window: int = 60 # days for rolling regression
confidence: float = 0.95 # statistical confidence threshold
herding_threshold: float = -0.10 # β₂ threshold for herding classification
sentiment_lag: int = 1 # days lag for sentiment signal
lstm_lookback: int = 20 # LSTM input sequence length
lstm_hidden: int = 32 # LSTM hidden units
lstm_epochs: int = 100
lstm_lr: float = 0.001
class CSADResult(NamedTuple):
date: object
csad: float
market_return: float
abs_market_ret: float
n_stocks: int
def compute_csad(
returns_wide: pl.DataFrame, # date × stock_code (wide format)
) -> pl.DataFrame:
"""
Compute CSAD time series from wide-format returns DataFrame.
Returns long DataFrame with columns:
date, csad, market_return, abs_market_return, n_stocks
"""
date_col = "date"
stock_cols = [c for c in returns_wide.columns if c != date_col]
results = []
for row in returns_wide.iter_rows(named=True):
date = row[date_col]
returns = np.array([row[c] for c in stock_cols
if row[c] is not None and not np.isnan(row[c])])
if len(returns) < 5:
continue
mkt_ret = float(np.mean(returns))
csad = float(np.mean(np.abs(returns - mkt_ret)))
results.append(CSADResult(
date=date,
csad=csad,
market_return=mkt_ret,
abs_market_ret=abs(mkt_ret),
n_stocks=len(returns),
))
return pl.DataFrame({
"date": [r.date for r in results],
"csad": [r.csad for r in results],
"market_return": [r.market_return for r in results],
"abs_market_ret": [r.abs_market_ret for r in results],
"n_stocks": [r.n_stocks for r in results],
}).with_columns([
(pl.col("market_return") ** 2).alias("market_return_sq"),
])
@dataclass
class CCKRegressionResult:
"""Results from Chang-Cheng-Khorana herding regression."""
alpha: float
beta1: float # coefficient on |Rₘ|
beta2: float # coefficient on Rₘ² - negative = herding
beta1_pvalue: float
beta2_pvalue: float
r_squared: float
n_obs: int
herding_detected: bool
herding_intensity: float # magnitude of beta2 when negative
def summary(self) -> str:
verdict = "🔴 HERDING DETECTED" if self.herding_detected else "🟢 No herding"
return (
f" CCK Regression Results\n"
f" {'─'*40}\n"
f" α: {self.alpha:>+.6f}\n"
f" β₁ (|Rₘ|): {self.beta1:>+.6f} (p={self.beta1_pvalue:.4f})\n"
f" β₂ (Rₘ²): {self.beta2:>+.6f} (p={self.beta2_pvalue:.4f})\n"
f" R²: {self.r_squared:.4f}\n"
f" N obs: {self.n_obs}\n"
f" {verdict}\n"
)
def run_cck_regression(
csad_df: pl.DataFrame,
config: HerdingConfig = HerdingConfig(),
) -> CCKRegressionResult:
"""
Run Chang-Cheng-Khorana (2000) herding regression.
CSAD_t = α + β₁·|Rₘ,t| + β₂·Rₘ,t² + εₜ
"""
df = csad_df.drop_nulls(["csad", "abs_market_ret", "market_return_sq"])
y = df["csad"].to_numpy()
X = np.column_stack([
np.ones(len(df)),
df["abs_market_ret"].to_numpy(),
df["market_return_sq"].to_numpy(),
])
# OLS via normal equations
beta, residuals, rank, sv = np.linalg.lstsq(X, y, rcond=None)
y_hat = X @ beta
ss_res = np.sum((y - y_hat) ** 2)
ss_tot = np.sum((y - np.mean(y)) ** 2)
r2 = 1 - ss_res / (ss_tot + 1e-10)
# Standard errors via sandwich estimator
n, k = X.shape
mse = ss_res / (n - k)
var_beta = mse * np.linalg.pinv(X.T @ X)
se_beta = np.sqrt(np.diag(var_beta))
t_stats = beta / (se_beta + 1e-10)
p_values = 2 * (1 - stats.t.cdf(np.abs(t_stats), df=n - k))
herding = (
beta[2] < config.herding_threshold and
p_values[2] < (1 - config.confidence)
)
return CCKRegressionResult(
alpha=float(beta[0]),
beta1=float(beta[1]),
beta2=float(beta[2]),
beta1_pvalue=float(p_values[1]),
beta2_pvalue=float(p_values[2]),
r_squared=float(r2),
n_obs=n,
herding_detected=herding,
herding_intensity=float(-beta[2]) if herding else 0.0,
)
Part 3: LSTM for Nonlinear Herding Detection
The CCK regression assumes a static, linear relationship. But herding in Islamic equities has temporal structure it builds over days as more fund managers observe the same Shariah compliance signals and make correlated rebalancing decisions. An LSTM captures this.
import numpy as np
from dataclasses import dataclass
# ─── Minimal LSTM in pure NumPy (no framework dependency) ────────────────────
# For production, replace with PyTorch or TensorFlow.
# This implementation is for transparency and reproducibility.
class LSTMCell:
"""Single LSTM cell - forward pass only (for inference)."""
def __init__(self, input_size: int, hidden_size: int, seed: int = 42):
rng = np.random.default_rng(seed)
scale = 1 / np.sqrt(hidden_size)
# Weight matrices: [W_input | W_hidden] concatenated
self.Wf = rng.normal(0, scale, (hidden_size, input_size + hidden_size))
self.Wi = rng.normal(0, scale, (hidden_size, input_size + hidden_size))
self.Wo = rng.normal(0, scale, (hidden_size, input_size + hidden_size))
self.Wg = rng.normal(0, scale, (hidden_size, input_size + hidden_size))
self.bf = np.zeros(hidden_size)
self.bi = np.zeros(hidden_size)
self.bo = np.zeros(hidden_size)
self.bg = np.zeros(hidden_size)
self.hidden_size = hidden_size
self.input_size = input_size
def forward(self, x: np.ndarray, h: np.ndarray, c: np.ndarray):
"""Single LSTM step."""
xh = np.concatenate([x, h])
f = self._sigmoid(self.Wf @ xh + self.bf) # forget gate
i = self._sigmoid(self.Wi @ xh + self.bi) # input gate
o = self._sigmoid(self.Wo @ xh + self.bo) # output gate
g = np.tanh(self.Wg @ xh + self.bg) # cell gate
c_new = f * c + i * g
h_new = o * np.tanh(c_new)
return h_new, c_new
@staticmethod
def _sigmoid(x: np.ndarray) -> np.ndarray:
return 1 / (1 + np.exp(-np.clip(x, -500, 500)))
def set_weights(self, weights: dict) -> None:
"""Load pre-trained weights."""
for attr, val in weights.items():
setattr(self, attr, val)
@dataclass
class HerdingLSTM:
"""
LSTM-based herding detector.
Input features (per timestep):
- CSAD
- |Rₘ|
- Rₘ²
- Rₘ (signed)
- Rolling CSAD z-score
- Sentiment score (optional)
Output:
- Herding probability [0, 1]
- Herding regime label {0: normal, 1: mild, 2: strong}
"""
config: HerdingConfig
lstm: LSTMCell = None
W_out: np.ndarray = None
b_out: float = 0.0
trained: bool = False
features: list[str] = field(default_factory=lambda: [
"csad_zscore", "abs_market_ret", "market_return_sq",
"market_return", "csad_momentum",
])
def __post_init__(self):
self.lstm = LSTMCell(
input_size=len(self.features),
hidden_size=self.config.lstm_hidden,
seed=42,
)
self.W_out = np.random.default_rng(42).normal(
0, 0.1, self.config.lstm_hidden
)
def prepare_features(self, csad_df: pl.DataFrame) -> np.ndarray:
"""
Build feature matrix from CSAD DataFrame.
Returns array of shape (T, n_features).
"""
df = csad_df.sort("date").with_columns([
# Rolling z-score of CSAD (60-day)
((pl.col("csad") - pl.col("csad").rolling_mean(60)) /
(pl.col("csad").rolling_std(60) + 1e-8))
.alias("csad_zscore"),
# 5-day momentum of CSAD
(pl.col("csad") - pl.col("csad").shift(5))
.alias("csad_momentum"),
]).drop_nulls()
return df.select(self.features).to_numpy()
def _forward_sequence(self, X: np.ndarray) -> np.ndarray:
"""
Run LSTM over a sequence of inputs.
X: shape (T, n_features)
Returns: herding probabilities, shape (T,)
"""
T = X.shape[0]
h = np.zeros(self.config.lstm_hidden)
c = np.zeros(self.config.lstm_hidden)
probs = np.zeros(T)
for t in range(T):
h, c = self.lstm.forward(X[t], h, c)
logit = float(self.W_out @ h + self.b_out)
probs[t] = 1 / (1 + np.exp(-logit))
return probs
def fit_supervised(
self,
csad_df: pl.DataFrame,
herding_labels: np.ndarray, # binary: 1 = herding day
n_epochs: int = None,
lr: float = None,
) -> list[float]:
"""
Train LSTM via BPTT with binary cross-entropy loss.
Simplified SGD with gradient clipping.
Returns training loss history.
"""
epochs = n_epochs or self.config.lstm_epochs
lr = lr or self.config.lstm_lr
X = self.prepare_features(csad_df)
y = herding_labels[-len(X):] # align
losses = []
for epoch in range(epochs):
probs = self._forward_sequence(X)
# Binary cross-entropy
eps = 1e-7
loss = -np.mean(
y * np.log(probs + eps) + (1 - y) * np.log(1 - probs + eps)
)
losses.append(float(loss))
# Simplified parameter update via output layer gradient
grad = probs - y # dL/d(logit)
h_seq = self._get_hidden_sequence(X) # re-run to get hiddens
dW_out = h_seq.T @ grad / len(y)
self.W_out -= lr * dW_out
self.b_out -= lr * float(np.mean(grad))
if (epoch + 1) % 20 == 0:
print(f" Epoch {epoch+1:3d}/{epochs} - loss: {loss:.5f}")
self.trained = True
return losses
def _get_hidden_sequence(self, X: np.ndarray) -> np.ndarray:
"""Run LSTM and collect all hidden states."""
T = X.shape[0]
h = np.zeros(self.config.lstm_hidden)
c = np.zeros(self.config.lstm_hidden)
hiddens = np.zeros((T, self.config.lstm_hidden))
for t in range(T):
h, c = self.lstm.forward(X[t], h, c)
hiddens[t] = h
return hiddens
def predict_proba(self, csad_df: pl.DataFrame) -> np.ndarray:
"""Predict herding probability for each day."""
X = self.prepare_features(csad_df)
return self._forward_sequence(X)
def predict_regime(
self,
csad_df: pl.DataFrame,
mild_threshold: float = 0.40,
high_threshold: float = 0.65,
) -> pl.DataFrame:
"""
Classify each day into herding regime.
0 = normal, 1 = mild herding, 2 = strong herding
"""
probs = self.predict_proba(csad_df)
n = len(probs)
dates = csad_df.sort("date")["date"].to_list()[-n:]
regimes = np.where(
probs >= high_threshold, 2,
np.where(probs >= mild_threshold, 1, 0)
)
return pl.DataFrame({
"date": dates,
"herding_prob": probs.tolist(),
"herding_regime": regimes.tolist(),
"regime_label": ["strong" if r == 2 else "mild" if r == 1
else "normal" for r in regimes],
})
Part 4: Sentiment Analysis for Islamic Equity Markets
Sentiment in Islamic equity markets has an additional dimension: Shariah confidence. News about Shariah board decisions, zakat compliance, regulatory actions by the SC or BNM, and ESG-Islamic finance alignment affects investor sentiment in ways that standard market sentiment measures don't capture.
from dataclasses import dataclass
import polars as pl
import numpy as np
import re
@dataclass(frozen=True)
class SentimentLexicon:
"""
Domain-specific sentiment lexicon for Islamic equity markets.
Combines general financial sentiment with Islamic finance terms.
"""
# General positive financial terms
positive_financial: list[str] = (
"earnings beat", "revenue growth", "dividend increase",
"profit", "outperform", "upgrade", "bullish", "rally",
"strong", "robust", "record", "growth", "expansion",
)
# Islamic finance positive terms
positive_islamic: list[str] = (
"shariah compliant", "halal certified", "zakat payment",
"sukuk issuance", "waqf", "maqasid", "ethical", "sustainable",
"islamic banking", "takaful", "murabahah", "musharakah",
"shariah approved", "sc approved", "compliant",
)
# General negative financial terms
negative_financial: list[str] = (
"earnings miss", "revenue decline", "loss", "downgrade",
"bearish", "selloff", "weak", "disappointing", "default",
"restructuring", "impairment", "write-down", "concern",
)
# Islamic finance negative / uncertainty terms
negative_islamic: list[str] = (
"shariah non-compliant", "riba", "gharar", "maysir",
"sc removed", "delisted", "excluded", "prohibited",
"haram", "non-compliant", "screening failure",
"shariah violation", "audit concern",
)
# Herding-specific terms
herding_indicators: list[str] = (
"institutional buying", "fund flows", "retail investors",
"following", "momentum", "trend", "crowded", "consensus",
"herd", "bandwagon", "fomo", "panic selling", "rush",
)
class IslamicSentimentScorer:
"""
Compute composite sentiment score for Islamic equity news.
Score = w_fin·S_fin + w_isl·S_isl + w_herd·S_herd
Each component in [-1, +1]
"""
def __init__(
self,
lexicon: SentimentLexicon = None,
weight_financial: float = 0.40,
weight_islamic: float = 0.40,
weight_herding: float = 0.20,
):
self.lexicon = lexicon or SentimentLexicon()
self.w_fin = weight_financial
self.w_isl = weight_islamic
self.w_herd = weight_herding
def _component_score(
self,
text: str,
positives: tuple,
negatives: tuple,
) -> float:
"""Score text on one sentiment dimension."""
text_lower = text.lower()
pos = sum(1 for term in positives if term in text_lower)
neg = sum(1 for term in negatives if term in text_lower)
total = pos + neg
if total == 0:
return 0.0
return (pos - neg) / total
def score_text(self, text: str) -> dict:
"""
Score a single news item.
Returns component scores and composite.
"""
fin_score = self._component_score(
text,
self.lexicon.positive_financial,
self.lexicon.negative_financial,
)
isl_score = self._component_score(
text,
self.lexicon.positive_islamic,
self.lexicon.negative_islamic,
)
# Herding: positive score = high herding language
herd_score = self._component_score(
text,
self.lexicon.herding_indicators,
(), # no negative herding terms
)
composite = (
self.w_fin * fin_score +
self.w_isl * isl_score +
self.w_herd * herd_score
)
return {
"financial_sentiment": fin_score,
"islamic_sentiment": isl_score,
"herding_sentiment": herd_score,
"composite_sentiment": composite,
}
def score_news_batch(
self,
news_df: pl.DataFrame, # columns: date, headline, body (optional)
) -> pl.DataFrame:
"""
Score a batch of news items and aggregate by date.
Returns daily sentiment scores.
"""
text_col = "headline"
if "body" in news_df.columns:
# Combine headline (weighted 2x) and body
news_df = news_df.with_columns([
(pl.col("headline") + " " + pl.col("headline") + " " + pl.col("body"))
.alias("full_text")
])
text_col = "full_text"
scores = []
for row in news_df.iter_rows(named=True):
s = self.score_text(row[text_col])
scores.append({
"date": row["date"],
**s,
})
return (
pl.DataFrame(scores)
.group_by("date")
.agg([
pl.col("financial_sentiment").mean(),
pl.col("islamic_sentiment").mean(),
pl.col("herding_sentiment").mean(),
pl.col("composite_sentiment").mean(),
pl.len().alias("n_articles"),
])
.sort("date")
)
Part 5: Permissible Trading Signal Generator
Now the piece that makes this relevant to algorithmic trading in an Islamic context. The signal must generate long-only signals (no short selling), avoid positions in Shariah-non-compliant stocks, and manage position sizing without excessive leverage (which introduces gharar through uncertainty of obligation):
@dataclass(frozen=True)
class IslamicTradingConstraints:
"""
Hard constraints for Shariah-compliant algorithmic trading.
Based on AAOIFI and SC Malaysia guidelines for automated strategies.
"""
allow_short_selling: bool = False # prohibited: borrowing to sell
max_leverage: float = 1.0 # no leverage - gharar on obligation
min_holding_days: int = 1 # avoid excessive speculation (maysir)
max_turnover_per_day: float = 0.20 # position sizing limit
require_shariah_screen: bool = True # mandatory SC list compliance
exclude_derivative_hedge: bool = True # no options/futures for hedging
@dataclass
class TradingSignal:
"""A single trading signal with its components and confidence."""
date: object
direction: int # +1 = buy, 0 = hold (no shorts allowed)
strength: float # [0, 1] signal strength
herding_regime: str # normal / mild / strong
herding_prob: float
sentiment_score: float
cck_beta2: float # classical herding metric
signal_source: str # "lstm", "cck", "sentiment", "combined"
rationale: str
class IslamicHerdingSignalGenerator:
"""
Combines CCK regression, LSTM detection, and sentiment
into Shariah-compliant long-only trading signals.
Strategy logic:
- When herding is ABSENT and sentiment is POSITIVE → buy momentum
- When herding is STRONG → reduce exposure (prices dislodged from fundamentals)
- When herding TRANSITIONS from strong to mild → reentry signal
- Never short, never leverage, always screen
"""
def __init__(
self,
config: HerdingConfig = HerdingConfig(),
constraints: IslamicTradingConstraints = IslamicTradingConstraints(),
):
self.config = config
self.constraints = constraints
self.scorer = IslamicSentimentScorer()
def generate_signals(
self,
csad_df: pl.DataFrame,
regimes_df: pl.DataFrame, # from HerdingLSTM.predict_regime()
sentiment_df: pl.DataFrame, # from IslamicSentimentScorer
cck_result: CCKRegressionResult,
) -> pl.DataFrame:
"""
Combine all signal sources into daily trading signals.
"""
# Merge all signals on date
signals_df = (
csad_df
.join(regimes_df, on="date", how="left")
.join(sentiment_df.select([
"date",
"composite_sentiment",
"herding_sentiment",
]), on="date", how="left")
.sort("date")
.with_columns([
pl.col("composite_sentiment").fill_null(0.0),
pl.col("herding_sentiment").fill_null(0.0),
pl.col("herding_prob").fill_null(0.5),
pl.col("herding_regime").fill_null(0),
])
)
# ── Signal rules ────────────────────────────────────────────────────────
signals_df = signals_df.with_columns([
# Lagged sentiment (avoid look-ahead)
pl.col("composite_sentiment")
.shift(self.config.sentiment_lag)
.alias("sentiment_lagged"),
# Herding regime transition: 2→1 or 1→0 (herd breaking up)
(pl.col("herding_regime").shift(1) > pl.col("herding_regime"))
.alias("herding_easing"),
# Herding intensifying: 0→1 or 1→2
(pl.col("herding_regime").shift(1) < pl.col("herding_regime"))
.alias("herding_intensifying"),
]).with_columns([
# ── Signal strength [0, 1] ────────────────────────────────────────
pl.when(
# Strong herding → reduce (0 signal, hold cash)
pl.col("herding_regime") == 2
).then(0.0)
.when(
# Herding easing + positive sentiment → strong buy
pl.col("herding_easing") &
(pl.col("sentiment_lagged") > 0.1)
).then(0.85)
.when(
# No herding + positive sentiment → moderate buy
(pl.col("herding_regime") == 0) &
(pl.col("sentiment_lagged") > 0.05)
).then(0.60)
.when(
# Mild herding + neutral sentiment → light position
pl.col("herding_regime") == 1
).then(0.30)
.otherwise(0.40) # neutral
.alias("signal_strength"),
]).with_columns([
# Direction: always 0 or +1 (no shorts - Islamic constraint)
pl.when(pl.col("signal_strength") > 0.25)
.then(1)
.otherwise(0)
.cast(pl.Int32)
.alias("direction"),
])
return signals_df.select([
"date", "direction", "signal_strength",
"herding_regime", "herding_prob",
"sentiment_lagged", "market_return",
"csad", "herding_easing", "herding_intensifying",
])
Part 6: Full Scenario Simulation
Now let's wire it all together with a realistic simulation of the Bursa Malaysia Islamic equity universe during a herding episode modelled on the type of correlated rebalancing that occurs around SC list updates:
import polars as pl
import numpy as np
from datetime import date, timedelta
rng = np.random.default_rng(2024)
# ── Simulate 3 years of daily returns for 50 Shariah-eligible stocks ─────────
N_STOCKS = 50
N_DAYS = 252 * 3
BASE_VOL = 0.018 # daily vol ~18% annualised
# Business dates
start = date(2022, 1, 3)
biz_dates = []
d = start
while len(biz_dates) < N_DAYS:
if d.weekday() < 5:
biz_dates.append(d)
d += timedelta(days=1)
# Market factor (FBMHS-like)
mkt_returns = rng.normal(0.0003, 0.012, N_DAYS)
# ── Inject herding episodes ───────────────────────────────────────────────────
# Three herding episodes: SC list update, COVID variant scare, rate shock
HERDING_EPISODES = [
(40, 60, 0.80), # SC quarterly update → correlated rebalancing
(180, 210, 0.90), # Market stress episode → panic correlation
(420, 460, 0.75), # Rate concerns → Islamic REIT/utility selloff
]
def inject_herding(
mkt_rets: np.ndarray,
episodes: list,
n_stocks: int,
n_days: int,
base_vol: float,
rng: np.random.Generator,
) -> np.ndarray:
"""Generate stock returns with herding episodes injected."""
# Idiosyncratic loadings to market factor
betas = rng.uniform(0.6, 1.4, n_stocks)
# Stock-specific returns: shape (n_days, n_stocks)
idio = rng.normal(0, base_vol, (n_days, n_stocks))
stock_rets = (
betas[None, :] * mkt_rets[:, None] + idio
)
# Inject herding: reduce idiosyncratic during episodes
for start_d, end_d, intensity in episodes:
duration = end_d - start_d
for t in range(start_d, min(end_d, n_days)):
# Ramp herding intensity in and out
progress = (t - start_d) / duration
ramp = intensity * np.sin(np.pi * progress)
# Compress idiosyncratic noise during herding
herd_shock = rng.normal(-0.003, 0.008) # common shock
stock_rets[t, :] = (
(1 - ramp) * stock_rets[t, :] +
ramp * (betas * mkt_rets[t] + herd_shock)
)
return stock_rets
stock_returns = inject_herding(
mkt_returns, HERDING_EPISODES, N_STOCKS, N_DAYS, BASE_VOL, rng
)
# Build wide DataFrame
stock_codes = [f"MY{str(i).zfill(4)}" for i in range(N_STOCKS)]
wide_dict = {"date": biz_dates}
wide_dict.update({
code: stock_returns[:, i].tolist()
for i, code in enumerate(stock_codes)
})
returns_wide = pl.DataFrame(wide_dict)
# ── Run CSAD engine ───────────────────────────────────────────────────────────
print("Computing CSAD...")
csad_df = compute_csad(returns_wide)
print(f" {len(csad_df)} trading days processed")
# ── CCK Regression ────────────────────────────────────────────────────────────
print("\nRunning CCK regression...")
config = HerdingConfig()
cck_result = run_cck_regression(csad_df, config)
print(cck_result.summary())
# ── Simulate news sentiment ───────────────────────────────────────────────────
SAMPLE_HEADLINES = [
# Normal period
("Earnings beat", "positive"),
("Revenue growth driven by domestic demand", "positive"),
("Shariah compliant portfolio expansion", "positive"),
("Sukuk issuance oversubscribed", "positive"),
# Herding period
("Institutional buying drives Islamic equities higher", "herding"),
("Fund flows into Shariah stocks amid SC update", "herding"),
("Retail investors follow institutional momentum", "herding"),
("Crowded trade in FBMHS index constituents", "herding"),
# Stress period
("Shariah non-compliant concerns raise flags", "negative"),
("SC removes stocks from approved list", "negative"),
("Earnings miss disappoints analysts", "negative"),
("Weak consumer sentiment hits staples", "negative"),
]
news_rows = []
for i, d in enumerate(biz_dates):
# More negative/herding news during episodes
in_episode = any(s <= i <= e for s, e, _ in HERDING_EPISODES)
if in_episode:
headlines = [h for h, t in SAMPLE_HEADLINES if t in ("herding", "negative")]
else:
headlines = [h for h, t in SAMPLE_HEADLINES if t == "positive"]
for _ in range(rng.integers(1, 4)):
news_rows.append({
"date": d,
"headline": rng.choice(headlines),
})
news_df = pl.DataFrame(news_rows).with_columns(
pl.col("date").cast(pl.Date)
)
scorer = IslamicSentimentScorer()
sentiment_df = scorer.score_news_batch(news_df)
print(f"\nSentiment scored for {len(sentiment_df)} trading days")
# ── LSTM Herding Detection ────────────────────────────────────────────────────
print("\nTraining LSTM herding detector...")
# Generate training labels from known episode periods
labels = np.zeros(len(csad_df))
for start_d, end_d, _ in HERDING_EPISODES:
labels[start_d:end_d] = 1
lstm_model = HerdingLSTM(config=config)
_ = lstm_model.fit_supervised(csad_df, labels, n_epochs=80, lr=0.002)
regimes_df = lstm_model.predict_regime(csad_df)
# ── Generate Trading Signals ──────────────────────────────────────────────────
print("\nGenerating Islamic trading signals...")
generator = IslamicHerdingSignalGenerator(config=config)
signals_df = generator.generate_signals(
csad_df=csad_df,
regimes_df=regimes_df,
sentiment_df=sentiment_df.with_columns(pl.col("date").cast(pl.Date)),
cck_result=cck_result,
)
# ── Simple Backtest ───────────────────────────────────────────────────────────
def backtest_signal(
signals: pl.DataFrame,
mkt_returns: np.ndarray,
cost_bps: float = 10, # 10 bps round-trip transaction cost
) -> pl.DataFrame:
"""
Long-only strategy: hold equal-weight market when signal=1, cash when=0.
Islamic constraints: no leverage, no shorts, cost applied on turnover.
"""
direction = signals["direction"].to_numpy()
mkt_ret = mkt_returns[:len(direction)]
# Strategy return: market return when invested, 0 when in cash
strat_ret = direction * mkt_ret
# Transaction costs on position changes
turnover = np.abs(np.diff(direction, prepend=direction[0]))
cost = turnover * cost_bps / 10000
strat_ret -= cost
# Cumulative returns
cum_mkt = np.cumprod(1 + mkt_ret)
cum_strat = np.cumprod(1 + strat_ret)
return pl.DataFrame({
"date": signals["date"].to_list(),
"signal": direction.tolist(),
"market_return": mkt_ret.tolist(),
"strategy_return": strat_ret.tolist(),
"cum_market": cum_mkt.tolist(),
"cum_strategy": cum_strat.tolist(),
})
backtest = backtest_signal(signals_df, mkt_returns)
# ── Performance Summary ───────────────────────────────────────────────────────
mkt_r = backtest["market_return"].to_numpy()
strat_r = backtest["strategy_return"].to_numpy()
def ann_return(r): return float(np.mean(r)) * 252
def ann_vol(r): return float(np.std(r, ddof=1)) * np.sqrt(252)
def sharpe(r): return ann_return(r) / (ann_vol(r) + 1e-10)
def max_dd(r):
c = np.cumprod(1 + r)
return float(((c - np.maximum.accumulate(c)) / np.maximum.accumulate(c)).min())
print(f"\n{'═'*52}")
print(f" Backtest Performance Summary")
print(f"{'─'*52}")
print(f" {'Metric':<28} {'Market':>10} {'Strategy':>10}")
print(f"{'─'*52}")
print(f" {'Ann. Return':<28} {ann_return(mkt_r)*100:>9.2f}% {ann_return(strat_r)*100:>9.2f}%")
print(f" {'Ann. Volatility':<28} {ann_vol(mkt_r)*100:>9.2f}% {ann_vol(strat_r)*100:>9.2f}%")
print(f" {'Sharpe Ratio':<28} {sharpe(mkt_r):>10.3f} {sharpe(strat_r):>10.3f}")
print(f" {'Max Drawdown':<28} {max_dd(mkt_r)*100:>9.2f}% {max_dd(strat_r)*100:>9.2f}%")
print(f"{'─'*52}")
# Signal analysis
invested_days = int(signals_df["direction"].sum())
total_days = len(signals_df)
herding_days = int((regimes_df["herding_regime"] > 0).sum())
print(f"\n Signal Diagnostics")
print(f"{'─'*52}")
print(f" Total trading days: {total_days}")
print(f" Days invested (long): {invested_days} ({invested_days/total_days*100:.1f}%)")
print(f" Days in cash (signal=0): {total_days-invested_days}")
print(f" Days herding detected: {herding_days} ({herding_days/total_days*100:.1f}%)")
print(f" Herding episodes (CCK β₂): {cck_result.beta2:>+.5f} "
f"({'significant' if cck_result.herding_detected else 'not significant'})")
print(f"{'═'*52}\n")
Part 7: Why Herding in Islamic Equities Is Structurally Different
The simulation above injects herding at three characteristic points. In the real Malaysian Islamic equity market, herding emerges predictably at similar structural triggers:
SC Quarterly List Updates. The Securities Commission reviews and updates its list of Shariah-compliant securities quarterly. When stocks are added or removed, all SC-compliant fund managers must rebalance simultaneously by a defined deadline, toward the same target universe. This is not irrational individual behavior; it's a regulatory obligation that produces coordinated trading that looks identical to herding from the outside. CSAD spikes at these junctures. The LSTM captures this because the pattern repeats quarterly.
BNM and Shariah Board Announcements. A fatwa from a major Shariah board affecting a commonly held stock moves the entire Islamic fund community in the same direction. Unlike conventional markets where different investor types respond differently to regulatory news, the Islamic fund community has a homogeneous response function.
Index Reconstitution Events. When the FBMHS or Dow Jones Islamic Malaysia index reconstitutes, passive Islamic funds must trade into the new composition. The size of passive Islamic fund AUM in Malaysia makes this a material price impact event with detectable herding signatures.
Liquidity Cascades. The Islamic eligible universe in Malaysia is approximately 600-700 stocks, but meaningful liquidity is concentrated in 100-150. During stress, when funds need to raise cash, the concentrated sell pressure in the liquid subset produces sharp cross-sectional return compression exactly the pattern CCK herding regression identifies.
Part 8: Permissibility of Algorithmic Trading The Fiqh Dimension
Using machine learning to generate trading signals in Islamic equities raises a question I haven't seen adequately addressed in the literature: is algorithmic trading itself Shariah-permissible?
The short answer from most contemporary Shariah scholars is yes, with conditions:
Permissible: Long-only systematic strategies that identify and act on market information asymmetries. The automated nature doesn't change the nature of the underlying transaction you are buying ownership in a halal business.
Permissible: Sentiment-based signal generation using publicly available news. This is a form of ijtihad using available information the same process a human analyst performs, executed at speed.
Questionable: High-frequency strategies that capture microstructure effects rather than fundamental value. If the strategy profits purely from latency arbitrage with no connection to economic value, some scholars classify this as a form of maysir (gambling on price movements with zero fundamental basis).
Prohibited: Strategies involving short selling, leverage, or derivatives (swaps, options, futures) for speculative rather than hedging purposes.
The herding-based signal in this post is firmly permissible: it is long-only, unlevered, and based on identifying periods when prices may have dislodged from fundamentals due to correlated behavior. Reducing exposure when herding is strong is a risk management decision, not a speculative one.
Part 9: What the Framework Detects That CCK Misses
Running CCK regression alone on the Malaysian Islamic equity universe would confirm herding is present β₂ < 0 and significant but it tells you nothing about:
- Timing: CCK gives you a full-sample estimate. It doesn't tell you which specific days or episodes are herding-driven.
- Intensity dynamics: The LSTM's herding probability is continuous - it shows you herding building and dissipating over time, not just present/absent.
- Leading indicators: The sentiment signal precedes price-based herding by 1-3 days. By the time CCK detects herding in price data, it has already started. Sentiment gets there earlier.
- Regime transitions: The strategy's best signal is the herding-easing period - when strong herding transitions to mild or normal. This is when prices are most likely to mean-revert toward fundamentals. CCK can't detect regime transitions at all.
This is the core argument for ML over pure regression in this context: the patterns are temporally structured, nonlinear, and have leading indicators that classical econometrics wasn't designed to exploit.
Herding in Islamic equities is not a bug it is a structural feature of a market with a homogeneous investor base constrained by a common regulatory screen. The question for a quantitative fund manager is not whether herding exists, but whether you can detect it early enough to either reduce your exposure before prices dislodge from fundamentals, or enter positions after prices revert.
The framework here CSAD, LSTM, domain-specific sentiment gives you all three detection layers. The signal is permissible. The code is ready. The backtest is honest.
This post draws on the framework from "Machine Learning vs. Human Investors: Analyzing Adaptive Herding Behavior in US Stocks vs. Shariah-Compliant Stocks in Malaysia." The Python implementation is original. The LSTM uses a pure NumPy implementation for transparency for production, use PyTorch with proper BPTT. Simulated data approximates Bursa Malaysia Shariah market characteristics. Not investment advice.