Mailing List Archive

[MediaWiki-commits] [Gerrit] search/MjoLniR[master]: Add end-to-end integration test
EBernhardson has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/406070 )

Change subject: Add end-to-end integration test
......................................................................

Add end-to-end integration test

A basic end to end run through of the training pipeline. It's
of course a bit slow, but worthwhile to see the whole operation
run from end to end.

* verifies that the general premise works
* outputs from one stage are expected by the input from the next
* models are in expected places and loadable
* evaluations run against the models match train time metrics

Change-Id: I8ad5fe1dbbbd50b897362b44411cfc19650b0390
---
M jvm/src/main/scala/org/wikimedia/search/mjolnir/PythonUtils.scala
M mjolnir/test/conftest.py
A mjolnir/test/fixtures/requests/test_integration.sqlite3
M mjolnir/test/training/test_xgboost.py
M mjolnir/training/xgboost.py
M mjolnir/utilities/data_pipeline.py
M mjolnir/utilities/make_folds.py
M mjolnir/utilities/training_pipeline.py
8 files changed, 63 insertions(+), 20 deletions(-)


git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR refs/changes/70/406070/1

diff --git a/jvm/src/main/scala/org/wikimedia/search/mjolnir/PythonUtils.scala b/jvm/src/main/scala/org/wikimedia/search/mjolnir/PythonUtils.scala
index 8c6929e..9200fe9 100644
--- a/jvm/src/main/scala/org/wikimedia/search/mjolnir/PythonUtils.scala
+++ b/jvm/src/main/scala/org/wikimedia/search/mjolnir/PythonUtils.scala
@@ -1,7 +1,7 @@
package org.wikimedia.search.mjolnir

import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint}
-import org.apache.spark.ml.linalg.{Vector => MLVector}
+import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vectors, Vector => MLVector}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Dataset, Row}
@@ -13,22 +13,38 @@
* pyspark.
*/
object PythonUtils {
+ private def shiftVector(vec: MLVector): MLVector = vec match {
+ case y: DenseVector => Vectors.dense(Array(0D) ++ y.toArray)
+ case y: SparseVector => Vectors.sparse(y.size + 1, y.indices.map(_ + 1), y.values)
+ }
+
/**
* There is no access to LabeledPoint from pyspark, but various methods such as
* trainWithRDD and eval require an RDD[MLLabeledPoint]. This offers a bridge to
* convert a Dataset into the required format.
*
+ * @deprecated
* @param ds Input dataframe containing features and label
* @param featureCol Name of the column containing feature vectors
* @param labelCol Name of the column containing numeric labels
+ * @param shiftRight Shift all features to index + 1. This is a disapointing hack,
+ * but due to the way data files are created feature indices start
+ * at 1 and the 0 feature is empty. This allows to shift to match
+ * when evaluating a dataframe againts a model trained that way.
*/
- def toLabeledPoints(ds: Dataset[_], featureCol: String, labelCol: String): RDD[MLLabeledPoint] = {
+ def toLabeledPoints(ds: Dataset[_], featureCol: String, labelCol: String, shiftRight: Boolean): RDD[MLLabeledPoint] = {
ds.select(col(featureCol), col(labelCol).cast(DoubleType)).rdd.map {
case Row(feature: MLVector, label: Double) =>
+ val shiftedFeature = if (shiftRight) shiftVector(feature) else feature
MLLabeledPoint(label, feature)
}
}

+ def toLabeledPoints(ds: Dataset[_], featureCol: String, labelCol: String): RDD[MLLabeledPoint] = {
+ toLabeledPoints(ds, featureCol, labelCol, shiftRight = false)
+ }
+
+
/**
* Training/evaluating a ranking model in XGBoost requires rows for the same
* query to be provided sequentially, and it needs to know for each partition
diff --git a/mjolnir/test/conftest.py b/mjolnir/test/conftest.py
index efc8441..c4c3d77 100644
--- a/mjolnir/test/conftest.py
+++ b/mjolnir/test/conftest.py
@@ -72,7 +72,8 @@
.set('spark.jars.packages', ','.join([
'ml.dmlc:xgboost4j-spark:0.8-wmf-1',
'org.wikimedia.search:mjolnir:0.4-SNAPSHOT',
- 'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0']))
+ 'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0',
+ 'org.wikimedia.analytics.refinery.hive:refinery-hive:0.0.57']))
# By default spark will shuffle to 200 partitions, which is
# way too many for our small test cases. This cuts execution
# time of the tests in half.
diff --git a/mjolnir/test/fixtures/requests/test_integration.sqlite3 b/mjolnir/test/fixtures/requests/test_integration.sqlite3
new file mode 100644
index 0000000..44957bf
--- /dev/null
+++ b/mjolnir/test/fixtures/requests/test_integration.sqlite3
Binary files differ
diff --git a/mjolnir/test/training/test_xgboost.py b/mjolnir/test/training/test_xgboost.py
index 100a3e8..ba8dc48 100644
--- a/mjolnir/test/training/test_xgboost.py
+++ b/mjolnir/test/training/test_xgboost.py
@@ -1,7 +1,6 @@
from __future__ import absolute_import
import mjolnir.training.xgboost
from pyspark.ml.linalg import Vectors
-import pyspark.sql
import pytest


@@ -107,7 +106,7 @@
# What else can we practically assert?
df_transformed = model.transform(df_train)
assert 'prediction' in df_transformed.columns
- assert 0.74 == pytest.approx(model.eval(df_train), abs=0.01)
+ assert 0.59 == pytest.approx(model.eval(df_train), abs=0.01)

# make sure train didn't clobber the incoming params
assert params['num_rounds'] == 1
diff --git a/mjolnir/training/xgboost.py b/mjolnir/training/xgboost.py
index 4d6bb9d..82c9d6c 100644
--- a/mjolnir/training/xgboost.py
+++ b/mjolnir/training/xgboost.py
@@ -59,7 +59,10 @@
# the pipeline so various tasks can accept a single column to work with.
df.select('label', 'features', F.concat('wikiid', 'query').alias('queryId'))
.repartition(num_partitions, 'queryId')
- .sortWithinPartitions('queryId')
+ # xgboost ndcg isn't stable if labels of matching predictions are in
+ # different input orders so sort by label too. Sorting labels
+ # ascending is the less generous option, where the worst label comes first.
+ .sortWithinPartitions('queryId', F.col('label').asc())
.cache())

j_groups = df._sc._jvm.org.wikimedia.search.mjolnir.PythonUtils.calcQueryGroups(
@@ -292,7 +295,7 @@
df_grouped = df_test

j_rdd = df_test._sc._jvm.org.wikimedia.search.mjolnir.PythonUtils.toLabeledPoints(
- df_grouped._jdf, feature_col, label_col)
+ df_grouped._jdf, feature_col, label_col, True)
score = self._j_xgb_model.eval(j_rdd, 'test', None, 0, False, j_groups)
return float(score.split('=')[1].strip())

@@ -327,7 +330,7 @@
}


-def tune(folds, stats, train_matrix, num_cv_jobs=5, initial_num_trees=100, final_num_trees=500):
+def tune(folds, stats, train_matrix, num_cv_jobs=5, initial_num_trees=100, final_num_trees=500, iterations=150):
"""Find appropriate hyperparameters for training df

This is far from perfect, hyperparameter tuning is a bit of a black art
@@ -376,13 +379,15 @@
dataset_size = 'large'
elif num_obs > 500000:
dataset_size = 'med'
- else:
+ elif num_obs > 500:
dataset_size = 'small'
+ else:
+ dataset_size = 'xsmall'

# Setup different tuning profiles for different sizes of datasets.
tune_spaces = [.
('initial', {
- 'iterations': 150,
+ 'iterations': iterations,
'space': {
'xlarge': {
'eta': hyperopt.hp.uniform('eta', 0.3, 0.8),
@@ -393,6 +398,7 @@
'min_child_weight', np.log(10), np.log(500), 10),
# % of features to use for each tree. helps prevent overfit
'colsample_bytree': hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01),
+ 'subsample': hyperopt.hp.quniform('subsample', 0.8, 1, .01),
},
'large': {
'eta': hyperopt.hp.uniform('eta', 0.3, 0.6),
@@ -400,6 +406,7 @@
'min_child_weight': hyperopt.hp.qloguniform(
'min_child_weight', np.log(10), np.log(300), 10),
'colsample_bytree': hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01),
+ 'subsample': hyperopt.hp.quniform('subsample', 0.8, 1, .01),
},
'med': {
'eta': hyperopt.hp.uniform('eta', 0.1, 0.6),
@@ -407,6 +414,7 @@
'min_child_weight': hyperopt.hp.qloguniform(
'min_child_weight', np.log(10), np.log(300), 10),
'colsample_bytree': hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01),
+ 'subsample': hyperopt.hp.quniform('subsample', 0.8, 1, .01),
},
'small': {
'eta': hyperopt.hp.uniform('eta', 0.1, 0.4),
@@ -414,6 +422,15 @@
'min_child_weight': hyperopt.hp.qloguniform(
'min_child_weight', np.log(10), np.log(100), 10),
'colsample_bytree': hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01),
+ 'subsample': hyperopt.hp.quniform('subsample', 0.8, 1, .01),
+ },
+ 'xsmall': {
+ 'eta': hyperopt.hp.uniform('eta', 0.1, 0.4),
+ 'max_depth': hyperopt.hp.quniform('max_depth', 3, 6, 1),
+ # Never use for real data, but convenient for tiny sets in test suite
+ 'min_child_weight': 0,
+ 'colsample_bytree': hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01),
+ 'subsample': hyperopt.hp.quniform('subsample', 0.8, 1, .01),
}
}[dataset_size]
})
@@ -442,6 +459,7 @@
'large': 6,
'med': 5,
'small': 4,
+ 'xsmall': 3,
}[dataset_size],
'gamma': 0,
'subsample': 1.0,
diff --git a/mjolnir/utilities/data_pipeline.py b/mjolnir/utilities/data_pipeline.py
index ae63de3..b1bc93d 100644
--- a/mjolnir/utilities/data_pipeline.py
+++ b/mjolnir/utilities/data_pipeline.py
@@ -22,18 +22,18 @@
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F
+import requests

SEARCH_CLUSTERS = {
'eqiad': ['http://elastic%d.eqiad.wmnet:9200' % (i) for i in range(1017, 1052)],
'codfw': ['http://elastic%d.codfw.wmnet:9200' % (i) for i in range(2001, 2035)],
+ 'localhost': ['http://localhost:9200'],
}


def run_pipeline(sc, sqlContext, input_dir, output_dir, wikis, samples_per_wiki,
min_sessions_per_query, search_cluster, brokers, ltr_feature_definitions,
- samples_size_tolerance):
- # TODO: Should this jar have to be provided on the command line instead?
- sqlContext.sql("ADD JAR /mnt/hdfs/wmf/refinery/current/artifacts/refinery-hive.jar")
+ samples_size_tolerance, session_factory=requests.Session):
sqlContext.sql("CREATE TEMPORARY FUNCTION stemmer AS 'org.wikimedia.analytics.refinery.hive.StemmerUDF'")

# Load click data from HDFS
@@ -61,7 +61,8 @@
# TODO: While this works for now, at some point we might want to handle
# things like multimedia search from commons, and non-main namespace searches.
indices={wiki: '%s_content' % (wiki) for wiki in wikis},
- min_sessions_per_query=min_sessions_per_query)
+ min_sessions_per_query=min_sessions_per_query,
+ session_factory=session_factory)

# Sample to some subset of queries per wiki
hit_page_id_counts, df_sampled_raw = mjolnir.sampling.sample(
@@ -90,7 +91,7 @@
df_rel = (
mjolnir.dbn.train(df_sampled, num_partitions=dbn_partitions, dbn_config={
'MAX_ITERATIONS': 40,
- 'MIN_DOCS_PER_QUERY': 10,
+ 'MIN_DOCS_PER_QUERY': 5,
'MAX_DOCS_PER_QUERY': 20,
'SERP_SIZE': 20,
'DEFAULT_REL': 0.5})
@@ -163,7 +164,8 @@
# things like multimedia search from commons, and non-main namespace searches.
indices={wiki: '%s_content' % (wiki) for wiki in wikis},
model=ltr_feature_definitions,
- feature_names_accu=fnames_accu)
+ feature_names_accu=fnames_accu,
+ session_factory=session_factory)
else:
if brokers:
df_features = mjolnir.features.collect_kafka(
@@ -184,7 +186,7 @@
# to vary per-wiki. Varied features per wiki would also mean they can't be trained
# together, which is perhaps a good thing anyways.
feature_definitions=mjolnir.features.enwiki_features(),
- feature_names_accu=fnames_accu)
+ feature_names_accu=fnames_accu, session_factory=session_factory)

# collect the accumulator
df_features.cache().count()
diff --git a/mjolnir/utilities/make_folds.py b/mjolnir/utilities/make_folds.py
index 5cbd682..dde25e5 100644
--- a/mjolnir/utilities/make_folds.py
+++ b/mjolnir/utilities/make_folds.py
@@ -131,7 +131,7 @@
df_fold = (
mjolnir.training.tuning.group_k_fold(df, num_folds)
.repartition(200, 'wikiid', 'query')
- .sortWithinPartitions('wikiid', 'query'))
+ .sortWithinPartitions('wikiid', 'query', 'label'))

try:
df_fold.cache()
diff --git a/mjolnir/utilities/training_pipeline.py b/mjolnir/utilities/training_pipeline.py
index 37d4473..55072a2 100644
--- a/mjolnir/utilities/training_pipeline.py
+++ b/mjolnir/utilities/training_pipeline.py
@@ -24,7 +24,10 @@
import sys


-def run_pipeline(sc, sqlContext, input_dir, output_dir, wikis, initial_num_trees, final_num_trees, num_cv_jobs):
+def run_pipeline(
+ sc, sqlContext, input_dir, output_dir, wikis, initial_num_trees,
+ final_num_trees, num_cv_jobs, iterations
+):
with hdfs_open_read(os.path.join(input_dir, 'stats.json')) as f:
stats = json.loads(f.read())

@@ -68,7 +71,8 @@
num_cv_jobs=num_cv_jobs,
train_matrix="train",
initial_num_trees=initial_num_trees,
- final_num_trees=final_num_trees)
+ final_num_trees=final_num_trees,
+ iterations=iterations)

print 'CV test-ndcg@10: %.4f' % (tune_results['metrics']['cv-test'])
print 'CV train-ndcg@10: %.4f' % (tune_results['metrics']['cv-train'])
@@ -143,6 +147,9 @@
help='Number of trees in the final ensemble. If not provided the value from '
+ '--initial-trees will be used. (Default: None)')
parser.add_argument(
+ '-t', '--iterations', dest='iterations', default=150, type=int,
+ help='The number of hyperparameter tuning iterations to perform')
+ parser.add_argument(
'-v', '--verbose', dest='verbose', default=False, action='store_true',
help='Increase logging to INFO')
parser.add_argument(

--
To view, visit https://gerrit.wikimedia.org/r/406070
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I8ad5fe1dbbbd50b897362b44411cfc19650b0390
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <ebernhardson@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits