{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Torque model\n",
"\n",
"This example studies the causes of leakage of a mechanical model.\n",
"\n",
"It has several parameters: Torque (T), Joint (J), Angle (A), Vibration (V) and Leak (L)."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"ExecuteTime": {
"end_time": "2018-06-14T16:58:21.622148Z",
"start_time": "2018-06-14T16:58:21.006932Z"
}
},
"outputs": [],
"source": [
"import pyAgrum as gum\n",
"import pyAgrum.lib.notebook as gnb\n",
"import openturns as ot\n",
"import otagrum\n",
"import numpy as np"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Probabilistic model**"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"ExecuteTime": {
"end_time": "2018-06-14T16:58:21.635380Z",
"start_time": "2018-06-14T16:58:21.625843Z"
}
},
"outputs": [],
"source": [
"# Marginal distributions\n",
"Torque = ot.LogNormal(0.0, 0.25)\n",
"Angle = ot.TruncatedNormal(0.0, 2.0, -8.0, 8.0)\n",
"Joint = ot.Uniform(1.8, 2.2)\n",
"\n",
"# Dependence\n",
"rho = 0.5\n",
"TorqueAngleCopula = ot.NormalCopula(ot.CorrelationMatrix(2, [1.0, rho, rho, 1.0]))\n",
"copula = ot.ComposedCopula([TorqueAngleCopula, ot.IndependentCopula(1)])\n",
"\n",
"# Joint distribution if needed\n",
"TorqueAngle = ot.ComposedDistribution([Torque, Angle], TorqueAngleCopula)\n",
"fullDistribution = ot.ComposedDistribution([Torque, Angle, Joint], copula)\n",
"\n",
"# Leakage angle (rd)\n",
"angleMax = 5.0\n",
"\n",
"# Leakage joint (mm)\n",
"jointMin = 2.0\n",
"jointSpread = 0.1\n",
"\n",
"# Vibration torque (kN.m)\n",
"torqueSpread = 2.0\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**(Discrete) Graphical model**"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"ExecuteTime": {
"end_time": "2018-06-14T17:13:01.818666Z",
"start_time": "2018-06-14T17:13:01.780668Z"
}
},
"outputs": [
{
"data": {
"text/html": [
""
],
"text/plain": [
"(gum::BayesNet@0x55e377bad020) BN{nodes: 5, arcs: 5, domainSize: 10^7.09215, parameters: 92011, compression ratio: 99% }"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"n_ticks = 100\n",
"nodes = 16\n",
"\n",
"\n",
"def completeTicks(range,ticks):\n",
" if range is None:\n",
" return [float(\"-inf\")]+ticks+[float(\"inf\")]\n",
" else:\n",
" return [range.getLowerBound()[0]] + ticks + [range.getUpperBound()[0]]\n",
"\n",
"\n",
"torque_ticks = [\n",
" (n_ticks - i) * Torque.getRange().getLowerBound()[0] / (n_ticks + 1) +\n",
" (i + 1.0) * Torque.getRange().getUpperBound()[0] / (n_ticks + 1)\n",
" for i in range(n_ticks)\n",
"]\n",
"\n",
"angle_ticks = [(n_ticks - i) * Angle.getRange().getLowerBound()[0] /\n",
" (n_ticks + 1) +\n",
" (i + 1.0) * Angle.getRange().getUpperBound()[0] / (n_ticks + 1)\n",
" for i in range(n_ticks)]\n",
"\n",
"joint_ticks = [(n_ticks - i) * Joint.getRange().getLowerBound()[0] /\n",
" (n_ticks + 1) +\n",
" (i + 1.0) * Joint.getRange().getUpperBound()[0] / (n_ticks + 1)\n",
" for i in range(n_ticks)]\n",
"\n",
"vibration_ticks = [-1.0, -0.5, 0.0, 0.5, 1.0]\n",
"\n",
"bn = gum.BayesNet()\n",
"bn.add(gum.DiscretizedVariable(\"T\", \"Torque\", completeTicks(Torque.getRange(),torque_ticks)))\n",
"bn.add(gum.DiscretizedVariable(\"A\", \"Angle\", completeTicks(Angle.getRange(),angle_ticks)))\n",
"bn.add(gum.DiscretizedVariable(\"J\", \"Joint\", completeTicks(Joint.getRange(),joint_ticks)))\n",
"\n",
"bn.add(\n",
" gum.DiscretizedVariable(\"V\", \"Vibration\", completeTicks(None,vibration_ticks)))\n",
"bn.add(gum.LabelizedVariable(\"L\", \"Leak\", [\"False\", \"True\"]))\n",
"\n",
"bn.addArc(\"T\", \"V\")\n",
"bn.addArc(\"T\", \"A\")\n",
"bn.addArc(\"J\", \"V\")\n",
"bn.addArc(\"J\", \"L\")\n",
"bn.addArc(\"A\", \"L\")\n",
"bn"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Discretizations**"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"\n",
"# This function allows to discretize a conditional distribution of X_d knowing X_1,...,X_{d-1} from a joint distribution of (X_1,...,X_d) and a discretization grid.\n",
"def discretizeFromJoint(fullDistribution, ticks):\n",
" fullDimension = fullDistribution.getDimension()\n",
" conditioningDistribution = fullDistribution.getMarginal([i for i in range(fullDimension-1)])\n",
" # Add the range bounds to the given ticks\n",
" lower = fullDistribution.getRange().getLowerBound()\n",
" upper = fullDistribution.getRange().getUpperBound()\n",
" expandedTicks = [0]*len(ticks)\n",
" for i in range(fullDimension):\n",
" expandedTicks[i] = [lower[i]] + ticks[i] + [upper[i]]\n",
" # Now perform the full discretization\n",
" lengths = [(len(t)-1) for t in expandedTicks]\n",
" tuples = ot.Tuples(lengths).generate()\n",
" probabilities = ot.Point(len(tuples))\n",
" for i in range(len(tuples)):\n",
" tuple = tuples[i]\n",
" aFull = [expandedTicks[j][tuple[j]] for j in range(fullDimension)]\n",
" bFull = [expandedTicks[j][tuple[j]+1] for j in range(fullDimension)]\n",
" aConditioning = [expandedTicks[j][tuple[j]] for j in range(fullDimension-1)]\n",
" bConditioning = [expandedTicks[j][tuple[j]+1] for j in range(fullDimension-1)]\n",
" den = conditioningDistribution.computeProbability(ot.Interval(aConditioning, bConditioning))\n",
" if den > 0.0:\n",
" num = fullDistribution.computeProbability(ot.Interval(aFull, bFull))\n",
" probabilities[i] = num / den\n",
" return probabilities\n",
"\n",
"# This function allows to discretize a conditional distribution knowing its conditional density function given as a Function, its conditioning distribution and a discretization grid.\n",
"# WARNING: The result is NOT normalized\n",
"def discretizeFromConditionalDensity(conditionalDensity, conditioningDistribution, ticks, useSlowIntegration=True, nodesNumber=32):\n",
" fullDimension = conditioningDistribution.getDimension() + 1\n",
" if useSlowIntegration:\n",
" # Accurate but slow\n",
" integrator = ot.IteratedQuadrature()\n",
" else:\n",
" # Less accurate for non-smooth integrand but fast\n",
" ot.ResourceMap.SetAsUnsignedInteger(\"GaussLegendre-DefaultMarginalIntegrationPointsNumber\", nodesNumber)\n",
" integrator = ot.GaussLegendre(fullDimension)\n",
" # Add the range bounds to the given ticks\n",
" lower = list(conditioningDistribution.getRange().getLowerBound())\n",
" upper = list(conditioningDistribution.getRange().getUpperBound())\n",
" # For the conditioned variable it has to be estimated. We assume that the given\n",
" # tick range is a correct margin to get the lower and upper bounds\n",
" conditionedMin = min(ticks[fullDimension - 1])\n",
" conditionedMax = max(ticks[fullDimension - 1])\n",
" delta = conditionedMax - conditionedMin\n",
" lower = lower + [conditionedMin - delta]\n",
" upper = upper + [conditionedMax + delta]\n",
" expandedTicks = [0]*fullDimension\n",
" for i in range(fullDimension):\n",
" expandedTicks[i] = [lower[i]] + ticks[i] + [upper[i]]\n",
" # Now perform the full discretization\n",
" lengths = [(len(t)-1) for t in expandedTicks]\n",
" tuples = ot.Tuples(lengths).generate()\n",
" probabilities = ot.Point(len(tuples))\n",
"\n",
" def kernel(x):\n",
" x = np.array(x)\n",
" return conditionalDensity(x) * np.array(conditioningDistribution.computePDF(x[:,0:fullDimension-1]))\n",
"\n",
" for i in range(len(tuples)):\n",
" tuple = tuples[i]\n",
" aFull = [expandedTicks[j][tuple[j]] for j in range(fullDimension)]\n",
" bFull = [expandedTicks[j][tuple[j]+1] for j in range(fullDimension)]\n",
" \n",
" num = integrator.integrate(ot.PythonFunction(fullDimension, 1, func_sample=kernel), ot.Interval(aFull, bFull))[0]\n",
" probabilities[i] = num\n",
" return probabilities\n",
" \n",
"# This function allows to discretize a conditional Bernoulli distribution knowing its conditional probability function given as a Function, its conditioning distribution and a conditional discretization grid.\n",
"def discretizeBernoulliFromConditionalProbability(conditionalProbability, conditioningDistribution, ticks, useSlowIntegration=True, nodesNumber=32):\n",
" conditioningDimension = conditioningDistribution.getDimension()\n",
" if useSlowIntegration:\n",
" # Accurate but slow\n",
" integrator = ot.IteratedQuadrature()\n",
" else:\n",
" # Less accurate for non-smooth integrand but fast\n",
" ot.ResourceMap.SetAsUnsignedInteger(\"GaussLegendre-DefaultMarginalIntegrationPointsNumber\", nodesNumber)\n",
" integrator = ot.GaussLegendre(conditioningDimension)\n",
" # Less accurate for non-smooth integrand but fast\n",
" ot.ResourceMap.SetAsUnsignedInteger(\"GaussLegendre-DefaultMarginalIntegrationPointsNumber\", 32)\n",
" integrator = ot.GaussLegendre(conditioningDimension)\n",
" # Add the range bounds to the given ticks\n",
" lower = list(conditioningDistribution.getRange().getLowerBound())\n",
" upper = list(conditioningDistribution.getRange().getUpperBound())\n",
" # Add the range bounds to the given ticks\n",
" lower = conditioningDistribution.getRange().getLowerBound()\n",
" upper = conditioningDistribution.getRange().getUpperBound()\n",
" expandedTicks = [0]*len(ticks)\n",
" for i in range(conditioningDimension):\n",
" expandedTicks[i] = [lower[i]] + ticks[i] + [upper[i]]\n",
" # Now perform the full discretization\n",
" lengths = [(len(t)-1) for t in expandedTicks]\n",
" tuples = ot.Tuples(lengths).generate()\n",
" probabilitiesTrue = [0]*len(tuples)\n",
"\n",
" def kernel(x):\n",
" x = np.array(x)\n",
" return conditionalProbability(x) * np.array(conditioningDistribution.computePDF(x[:,0:conditioningDimension]))\n",
"\n",
" for i in range(len(tuples)):\n",
" tuple = tuples[i]\n",
" aConditioning = [expandedTicks[j][tuple[j]] for j in range(conditioningDimension)]\n",
" bConditioning = [expandedTicks[j][tuple[j]+1] for j in range(conditioningDimension)]\n",
" den = conditioningDistribution.computeProbability(ot.Interval(aConditioning, bConditioning))\n",
" if den > 0.0:\n",
" num = integrator.integrate(ot.PythonFunction(conditioningDimension, 1, func_sample=kernel), ot.Interval(aConditioning, bConditioning))[0]\n",
" probabilitiesTrue[i] = min(1.0, num / den)\n",
" probabilities = ot.Point([1.0 - p for p in probabilitiesTrue] + probabilitiesTrue)\n",
" return probabilities"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2018-06-14T17:03:51.492223Z",
"start_time": "2018-06-14T16:58:21.864069Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"3\n"
]
}
],
"source": [
"# Discretization of everything\n",
"\n",
"# Compute P(Leakage = True | Angle = angle, Joint = joint)\n",
"def P_LeakageKnowingAngleAndJoint(x):\n",
" x = np.array(x)\n",
" angle = x[:, 0]\n",
" joint = x[:, 1]\n",
" s = (1, x.shape[0])\n",
" sp = (x.shape[0], 1)\n",
" one = np.ones(s)\n",
" return (np.minimum(np.abs(angle / angleMax), one) * np.minimum(\n",
" one, np.exp(-(joint - jointMin) / jointSpread))).reshape(sp)\n",
"\n",
"\n",
"# Compute K.p(Vibration = v | Torque = torque, Joint = joint) where K is unknown\n",
"def f_VibrationKnowingTorqueAndJoint(x):\n",
" x = np.array(x)\n",
" joint = x[:, 0]\n",
" torque = x[:, 1]\n",
" jointRed = joint / jointSpread\n",
" torqueRed = torque / torqueSpread\n",
" return ((1.0 + jointRed**2 + torqueRed**2)**(-4.0)).reshape(x.shape[0], 1)\n",
"\n",
"\n",
"AngleKnowingTorque = discretizeFromJoint(TorqueAngle,\n",
" [torque_ticks, angle_ticks])\n",
"\n",
"LeakageKnowingAngleAndJoint = discretizeBernoulliFromConditionalProbability(\n",
" P_LeakageKnowingAngleAndJoint, ot.ComposedDistribution([Angle, Joint]),\n",
" [angle_ticks, joint_ticks], False, nodes)\n",
"\n",
"VibrationKnowingTorqueAndJoint = discretizeFromConditionalDensity(\n",
" f_VibrationKnowingTorqueAndJoint, ot.ComposedDistribution([Torque, Joint]),\n",
" [torque_ticks, joint_ticks, vibration_ticks], False, nodes)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Discretized Parameters for the BN**"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2018-06-14T17:03:52.048139Z",
"start_time": "2018-06-14T17:03:51.494271Z"
}
},
"outputs": [],
"source": [
"bn.cpt(\"J\").fillWith(otagrum.Utils.Discretize(Torque, bn.variable(\"J\").toDiscretizedVar()))\n",
"bn.cpt(\"T\").fillWith(otagrum.Utils.Discretize(Torque, bn.variable(\"T\").toDiscretizedVar()))\n",
"\n",
"bn.cpt(\"A\").fillWith(list(AngleKnowingTorque)).normalizeAsCPT()\n",
"\n",
"p=gum.Potential().add(bn.variable(\"J\")).add(bn.variable(\"A\")).add(bn.variable(\"L\"))\n",
"p.fillWith(list(LeakageKnowingAngleAndJoint))\n",
"s=bn.cpt(\"L\").var_names\n",
"s.reverse()\n",
"p.reorganize(s)\n",
"bn.cpt(\"L\").fillWith(p)\n",
"\n",
"p=gum.Potential().add(bn.variable(\"J\")).add(bn.variable(\"T\")).add(bn.variable(\"V\"))\n",
"p.fillWith(list(VibrationKnowingTorqueAndJoint))\n",
"s=bn.cpt(\"V\").var_names\n",
"s.reverse()\n",
"p.reorganize(s)\n",
"bn.cpt(\"V\").fillWith(p).normalizeAsCPT()\n",
"gnb.showInformation(bn)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2018-06-14T17:03:53.929622Z",
"start_time": "2018-06-14T17:03:52.050947Z"
}
},
"outputs": [],
"source": [
"gnb.showInference(bn,size=\"20\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2018-06-14T17:03:56.100016Z",
"start_time": "2018-06-14T17:03:53.933280Z"
}
},
"outputs": [],
"source": [
"gnb.showInference(bn,evs={\"L\":True},size=\"20\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2018-06-14T17:03:58.035674Z",
"start_time": "2018-06-14T17:03:56.104253Z"
}
},
"outputs": [],
"source": [
"gnb.showInference(bn,evs={\"L\":False,\"A\":\"0.2\"},size=\"20\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2018-06-14T17:03:58.042430Z",
"start_time": "2018-06-14T17:03:58.038094Z"
}
},
"outputs": [],
"source": [
"ie=gum.LazyPropagation(bn)\n",
"ie.addJointTarget(set([\"T\",\"J\"]))\n",
"ie.setEvidence({\"L\":True})\n",
"ie.makeInference()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2018-06-14T17:04:25.963810Z",
"start_time": "2018-06-14T17:03:58.044484Z"
}
},
"outputs": [],
"source": [
"distrib=otagrum.Utils.FromPotential(ie.jointPosterior([\"T\", \"J\"]))\n",
"distrib.drawPDF()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2018-06-14T17:04:25.969569Z",
"start_time": "2018-06-14T17:04:25.965716Z"
}
},
"outputs": [],
"source": [
"ie=gum.LazyPropagation(bn)\n",
"ie.addJointTarget(set([\"T\",\"J\"]))\n",
"ie.setEvidence({\"L\":False})\n",
"ie.makeInference()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2018-06-14T17:04:52.660655Z",
"start_time": "2018-06-14T17:04:25.971382Z"
}
},
"outputs": [],
"source": [
"distrib=otagrum.Utils.FromPotential(ie.jointPosterior([\"T\",\"J\"]))\n",
"distrib.drawPDF()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"hide_input": false,
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.4"
}
},
"nbformat": 4,
"nbformat_minor": 4
}