python - Spark cartesian product -


i have compare coordinates in order distance. therefor load data sc.textfile() , make cartesian product. there 2.000.000 lines in textfile 2.000.000 x 2.000.000 compared coordinates.

i tested code 2.000 coordinates , worked fine within seconds. using big file seems stop @ point , don't know why. code looks follows:

def concat(x,y):     if(isinstance(y, list)&(isinstance(x,list))):         return x + y     if(isinstance(x,list)&isinstance(y,tuple)):         return x + [y]     if(isinstance(x,tuple)&isinstance(y,list)):         return [x] + y     else: return [x,y]  def haversian_dist(tuple):     lat1 = float(tuple[0][0])     lat2 = float(tuple[1][0])     lon1 = float(tuple[0][2])     lon2 = float(tuple[1][2])     p = 0.017453292519943295     = 0.5 - cos((lat2 - lat1) * p)/2 + cos(lat1 * p) * cos(lat2 * p) * (1 - cos((lon2 - lon1) * p)) / 2     print(tuple[0][1])     return (int(float(tuple[0][1])), (int(float(tuple[1][1])),12742 * asin(sqrt(a))))  def sort_val(tuple):     dtype = [("globalid", int),("distance",float)]     = np.array(tuple[1], dtype=dtype)     sorted_mins = np.sort(a, order="distance",kind="mergesort")     return (tuple[0], sorted_mins)   def calc_matrix(sc, path, rangeval, savepath, name):     data = sc.textfile(path)     data = data.map(lambda x: x.split(";"))     data = data.repartition(100).cache()     data.collect()     matrix = data.cartesian(data)     values = matrix.map(haversian_dist)     values = values.reducebykey(concat)     values = values.map(sort_val)     values = values.map(lambda x: (x[0], x[1][1:int(rangeval)].tolist()))     values = values.map(lambda x: (x[0], [y[0] y in x[1]]))     dicti = values.collectasmap()     hp.save_pickle(dicti, savepath, name) 

even file 15.000 entries doesn't work. know cartesian causes o(n^2) runtime. shouldn't spark handle this? or wrong? starting point error message, don't know if relates actual problem:

16/08/06 22:21:12 warn tasksetmanager: lost task 15.0 in stage 1.0 (tid 16, hlb0004): java.net.socketexception: daten?bergabe unterbrochen (broken pipe)     @ java.net.socketoutputstream.socketwrite0(native method)     @ java.net.socketoutputstream.socketwrite(socketoutputstream.java:109)     @ java.net.socketoutputstream.write(socketoutputstream.java:153)     @ java.io.bufferedoutputstream.write(bufferedoutputstream.java:122)     @ java.io.dataoutputstream.write(dataoutputstream.java:107)     @ java.io.filteroutputstream.write(filteroutputstream.java:97)     @ org.apache.spark.api.python.pythonrdd$.org$apache$spark$api$python$pythonrdd$$write$1(pythonrdd.scala:440)     @ org.apache.spark.api.python.pythonrdd$$anonfun$writeiteratortostream$1.apply(pythonrdd.scala:452)     @ org.apache.spark.api.python.pythonrdd$$anonfun$writeiteratortostream$1.apply(pythonrdd.scala:452)     @ scala.collection.iterator$class.foreach(iterator.scala:727)     @ scala.collection.abstractiterator.foreach(iterator.scala:1157)     @ org.apache.spark.api.python.pythonrdd$.writeiteratortostream(pythonrdd.scala:452)     @ org.apache.spark.api.python.pythonrunner$writerthread$$anonfun$run$3.apply(pythonrdd.scala:280)     @ org.apache.spark.util.utils$.loguncaughtexceptions(utils.scala:1741)     @ org.apache.spark.api.python.pythonrunner$writerthread.run(pythonrdd.scala:239)  16/08/06 22:21:12 info tasksetmanager: starting task 15.1 in stage 1.0 (tid 17, hlb0004, partition 15,process_local, 2408 bytes) 16/08/06 22:21:12 warn tasksetmanager: lost task 7.0 in stage 1.0 (tid 8, hlb0004): java.net.socketexception: connection reset     @ java.net.socketinputstream.read(socketinputstream.java:209)     @ java.net.socketinputstream.read(socketinputstream.java:141)     @ java.io.bufferedinputstream.fill(bufferedinputstream.java:246)     @ java.io.bufferedinputstream.read(bufferedinputstream.java:265)     @ java.io.datainputstream.readint(datainputstream.java:387)     @ org.apache.spark.api.python.pythonrunner$$anon$1.read(pythonrdd.scala:139)     @ org.apache.spark.api.python.pythonrunner$$anon$1.<init>(pythonrdd.scala:207)     @ org.apache.spark.api.python.pythonrunner.compute(pythonrdd.scala:125)     @ org.apache.spark.api.python.pythonrdd.compute(pythonrdd.scala:70)     @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:306)     @ org.apache.spark.rdd.rdd.iterator(rdd.scala:270)     @ org.apache.spark.api.python.pairwiserdd.compute(pythonrdd.scala:342)     @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:306)     @ org.apache.spark.rdd.rdd.iterator(rdd.scala:270)     @ org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:73)     @ org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:41)     @ org.apache.spark.scheduler.task.run(task.scala:89)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:213)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617)     @ java.lang.thread.run(thread.java:745) 

you used data.collect() in code calls data 1 machine. depending on memory on machine, 2,000,000 lines of data might not fit well.

also, tried reduce number of computations done doing joins instead of using cartesian. (please note generated random numbers using numpy , format here may different have. still, main idea same.)

import numpy np numpy import arcsin, cos, sqrt  # suppose data consists of latlong pairs # use indices pairing values data = sc.parallelize(np.random.rand(10,2)).zipwithindex() data = data.map(lambda (val, idx): (idx, val))  # generate pairs (e.g. if have 3 pairs indices [0,1,2], # have compute distances of pairs (0,1), (0,2) & (1,2) idxs = range(data.count()) indices = sc.parallelize([(i,j) in idxs j in idxs if < j])  # haversian func (i took liberty of editing parts of it) def haversian_dist(latlong1, latlong2):     lat1, lon1 = latlong1     lat2, lon2 = latlong2     p = 0.017453292519943295     def hav(theta): return (1 - cos(p * theta))/2     = hav(lat2 - lat1) + cos(p * lat1)*cos(p * lat2)*hav(lon2 - lon1)     return 12742 * arcsin(sqrt(a))  joined1 = indices.join(data).map(lambda (i, (j, val)): (j, (i, val))) joined2 = joined1.join(data).map(lambda (j, ((i, latlong1), latlong2)): ((i,j), (latlong1, latlong2)) haversianrdd = joined2.mapvalues(lambda (x, y): haversian_dist(x, y)) 

Comments

Popular posts from this blog

Spring Boot + JPA + Hibernate: Unable to locate persister -

go - Golang: panic: runtime error: invalid memory address or nil pointer dereference using bufio.Scanner -

c - double free or corruption (fasttop) -