python - Spark cartesian product -
i have compare coordinates in order distance. therefor load data sc.textfile() , make cartesian product. there 2.000.000 lines in textfile 2.000.000 x 2.000.000 compared coordinates.
i tested code 2.000 coordinates , worked fine within seconds. using big file seems stop @ point , don't know why. code looks follows:
def concat(x,y): if(isinstance(y, list)&(isinstance(x,list))): return x + y if(isinstance(x,list)&isinstance(y,tuple)): return x + [y] if(isinstance(x,tuple)&isinstance(y,list)): return [x] + y else: return [x,y] def haversian_dist(tuple): lat1 = float(tuple[0][0]) lat2 = float(tuple[1][0]) lon1 = float(tuple[0][2]) lon2 = float(tuple[1][2]) p = 0.017453292519943295 = 0.5 - cos((lat2 - lat1) * p)/2 + cos(lat1 * p) * cos(lat2 * p) * (1 - cos((lon2 - lon1) * p)) / 2 print(tuple[0][1]) return (int(float(tuple[0][1])), (int(float(tuple[1][1])),12742 * asin(sqrt(a)))) def sort_val(tuple): dtype = [("globalid", int),("distance",float)] = np.array(tuple[1], dtype=dtype) sorted_mins = np.sort(a, order="distance",kind="mergesort") return (tuple[0], sorted_mins) def calc_matrix(sc, path, rangeval, savepath, name): data = sc.textfile(path) data = data.map(lambda x: x.split(";")) data = data.repartition(100).cache() data.collect() matrix = data.cartesian(data) values = matrix.map(haversian_dist) values = values.reducebykey(concat) values = values.map(sort_val) values = values.map(lambda x: (x[0], x[1][1:int(rangeval)].tolist())) values = values.map(lambda x: (x[0], [y[0] y in x[1]])) dicti = values.collectasmap() hp.save_pickle(dicti, savepath, name)
even file 15.000 entries doesn't work. know cartesian causes o(n^2) runtime. shouldn't spark handle this? or wrong? starting point error message, don't know if relates actual problem:
16/08/06 22:21:12 warn tasksetmanager: lost task 15.0 in stage 1.0 (tid 16, hlb0004): java.net.socketexception: daten?bergabe unterbrochen (broken pipe) @ java.net.socketoutputstream.socketwrite0(native method) @ java.net.socketoutputstream.socketwrite(socketoutputstream.java:109) @ java.net.socketoutputstream.write(socketoutputstream.java:153) @ java.io.bufferedoutputstream.write(bufferedoutputstream.java:122) @ java.io.dataoutputstream.write(dataoutputstream.java:107) @ java.io.filteroutputstream.write(filteroutputstream.java:97) @ org.apache.spark.api.python.pythonrdd$.org$apache$spark$api$python$pythonrdd$$write$1(pythonrdd.scala:440) @ org.apache.spark.api.python.pythonrdd$$anonfun$writeiteratortostream$1.apply(pythonrdd.scala:452) @ org.apache.spark.api.python.pythonrdd$$anonfun$writeiteratortostream$1.apply(pythonrdd.scala:452) @ scala.collection.iterator$class.foreach(iterator.scala:727) @ scala.collection.abstractiterator.foreach(iterator.scala:1157) @ org.apache.spark.api.python.pythonrdd$.writeiteratortostream(pythonrdd.scala:452) @ org.apache.spark.api.python.pythonrunner$writerthread$$anonfun$run$3.apply(pythonrdd.scala:280) @ org.apache.spark.util.utils$.loguncaughtexceptions(utils.scala:1741) @ org.apache.spark.api.python.pythonrunner$writerthread.run(pythonrdd.scala:239) 16/08/06 22:21:12 info tasksetmanager: starting task 15.1 in stage 1.0 (tid 17, hlb0004, partition 15,process_local, 2408 bytes) 16/08/06 22:21:12 warn tasksetmanager: lost task 7.0 in stage 1.0 (tid 8, hlb0004): java.net.socketexception: connection reset @ java.net.socketinputstream.read(socketinputstream.java:209) @ java.net.socketinputstream.read(socketinputstream.java:141) @ java.io.bufferedinputstream.fill(bufferedinputstream.java:246) @ java.io.bufferedinputstream.read(bufferedinputstream.java:265) @ java.io.datainputstream.readint(datainputstream.java:387) @ org.apache.spark.api.python.pythonrunner$$anon$1.read(pythonrdd.scala:139) @ org.apache.spark.api.python.pythonrunner$$anon$1.<init>(pythonrdd.scala:207) @ org.apache.spark.api.python.pythonrunner.compute(pythonrdd.scala:125) @ org.apache.spark.api.python.pythonrdd.compute(pythonrdd.scala:70) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:306) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:270) @ org.apache.spark.api.python.pairwiserdd.compute(pythonrdd.scala:342) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:306) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:270) @ org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:73) @ org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:41) @ org.apache.spark.scheduler.task.run(task.scala:89) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:213) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745)
you used data.collect()
in code calls data 1 machine. depending on memory on machine, 2,000,000 lines of data might not fit well.
also, tried reduce number of computations done doing joins instead of using cartesian
. (please note generated random numbers using numpy , format here may different have. still, main idea same.)
import numpy np numpy import arcsin, cos, sqrt # suppose data consists of latlong pairs # use indices pairing values data = sc.parallelize(np.random.rand(10,2)).zipwithindex() data = data.map(lambda (val, idx): (idx, val)) # generate pairs (e.g. if have 3 pairs indices [0,1,2], # have compute distances of pairs (0,1), (0,2) & (1,2) idxs = range(data.count()) indices = sc.parallelize([(i,j) in idxs j in idxs if < j]) # haversian func (i took liberty of editing parts of it) def haversian_dist(latlong1, latlong2): lat1, lon1 = latlong1 lat2, lon2 = latlong2 p = 0.017453292519943295 def hav(theta): return (1 - cos(p * theta))/2 = hav(lat2 - lat1) + cos(p * lat1)*cos(p * lat2)*hav(lon2 - lon1) return 12742 * arcsin(sqrt(a)) joined1 = indices.join(data).map(lambda (i, (j, val)): (j, (i, val))) joined2 = joined1.join(data).map(lambda (j, ((i, latlong1), latlong2)): ((i,j), (latlong1, latlong2)) haversianrdd = joined2.mapvalues(lambda (x, y): haversian_dist(x, y))
Comments
Post a Comment