Dask Parallel and Distributed Computing | SciPy 2016 | Matthew Rocklin



morning everyone I see a good crowd so again I'm Matt this is Jim we are core developers on desk among others we're talk about desk very broadly tasks is a Python library for parallel computing it works well in a single notebook and also on a large cluster this talked to me in three parts those parts are roughly sewed ask paralyzes numpy and pandas provides sort of alternative parallel versions of those libraries most of our talks are focused on on this topic it's a very sexy flashy topic usually goes over well for this crowd we actually wanted to dive into more of the nitty-gritty details of two aspects of task B it's slightly less flashy but I think it'll really engage this audience well so the supports are gonna be ad hoc parallel computing so how do we make perla programs that don't fit into a sort of a big list or big data frame a big array abstraction which ends up coming up pretty often especially with library developers and also distributed computing which is a new thing for desk I'll talk about the first bit in the last bit James can talk about ad-hoc cradle computing so first we're going to do a quick flashy the Shiva didn't ask data frame fit so those pandas running on a cluster you can follow along if you want if you go to sci-fi 2016 Jupiter org you can get your very own cluster with eight nodes of two cores each running other notebook about to show thanks goes to Ben Zealand and men reckon Kelly for setting up for us as you press this button it's a very satisfying button it's also closed for you give it 120 seconds okay so do this right very briefly I have a bunch of CSV data on s3 or HDFS source a manifest file system and I want to read it with pandas this is the New York City Taxi Cab data set it's around 12 is around 20 gigabytes on disk and a bunch of CSV files or surrounds 60 gigabytes in RAM once it sort of blows up in pandas I can read a little bit that with pandas using the n rows equals 5q it argument but I can't read at all good bloke Brown so this is a common problem fortunately there's data frame so – that a frame is going to load this data for us on a cluster of ten machines each machine is eight cores what it's doing is breaking up those 12 CSV files into 365 little pandas data frames that are loading up across the cluster in RAM on each micro machines and the example on the notebook online there would be a one file you can expand it or if you want and while that's happening we can go and look at what we have and that object is a desk data frame which looks a whole lot like a panda's data frame but it's again comprised of 305 pan tostado frames on different machines now parallel algorithms are hard sometimes and that side of frame is going to do all the coordination of all those pandas at our hands for you so one data frame is one logical collection of many penta Tata frames either on your laptop on disk or on cluster machines communicating or network you may have noticed this very flashy thing to my right this is the web UI you'll be seeing it a bit during the talk and this is showing the execution of what's happening on a cluster so I have 80 cores so this is you know from 248 down to 1 or 60 and what's happening over time the purple here was loading data from s3 and the yellow was calling pandas read CSV on those bytes so you can see what's as I do things on the left you can see what's happening on the cluster on the right it's nice way to visualize what's happening and so you just add a frame object look so like a panda's data frame it computes thing just about as well and when we do things like say it could be the passenger count so we're actually computing lots of little small password accounts about the cluster this red bit is communicating all that data back to one machine in computing with some of sums some is pretty easy there's things like group by which are more complex and this goes on pretty pretty pretty well so – that our frame implants group buys joins daytime operations role in operations reductions element-wise things a lot of things not all things things like pivot tables was in the works otic functionality pandas no chance but so that said if it was really just a bunch of pandas dataframes it was actually developed alongside a lot of the kind of developers who've done a lot of the work Jeffrey Bock is a lot of work on panda side lost corner Koshi isn't a lot of work on the desk side I didn't do some fun and exciting computations here we're pulling out some bad rows we're grouping by the hour of day we're then computing a call on the new call on the tip fraction column so what how well do New Yorkers tip by hour day did they tip 20% at 5 p.m. or 12 percent 4 a.m. and what you find very nicely said New Yorkers are very very generous very early in the morning we looked on 38% on average so all of 2015 my colleagues in York City is telling me this is last call at bars so but you know bands experienced now on a cluster we can give a whole talk on this we're not going to because there's other things we should come and talk about so that's really though there's also this exact same experience exist for numpy as well that's actually the first system around it's very mature at this point and also for a list sort of a PI spark like thing that was all those they're nice what we found is that in interacting with scientists with more interesting workloads or even continuum clients often they have problems that are definitely very parallelizable well they don't easily fit into a data frame abstraction if you're into an array abstraction they're more custom this actually comes up a lot you all do a lot of weird things it turns out this community in particular does just really strange things like if you look how pandas is used they're not just using the pandas API this depends API plus a lot general computation a lot of general Python around that API and it's that that's that general stuff that surrounds pandas a numpy that we do that's really hard to capture and sort of the big data tools and that's really the challenge that we're trying to meet what we found is that so – out of him Tasker a live on top of doused core well the data frame in the array weren't applicable easily is sort of very awkward to shove problems into those abstractions the court bits were the scheduler was censored of the graph spec work so originally now squeeze designed around asked array and we had a scheduler and pass scheduler shall talk about in a while on top of that we had a specification for how to design path graphs and pop of that we built task array and this separation was really nice allowed us is very quickly produced staffs bag and a static frame afterwards so sort of expanded out and the sort of this plan to keep building things but again it sort of interacting lots of people so some people are totally happy climate scientists atmospheric scientists and Indian with lots of like big arrays they're extremely happy with this project especially through the x-ray project from some employer and others but others visitor Loughran we ended up finding that we were just using the graph spec and the scheduler and sort of ditching everything else so what we've done is we made a very small API that's delayed has inspired from the job Lib delayed API but extended and on top of that we've been able to build other libraries so for example over the last few weeks Jim is building a scikit-learn parallel clone you know for a subset a second learn and that's been a lot easier to build in the previous things because you're not using this task to lay the thing it's very thin API on top of the scheduler it's very very wrong additionally there are other things we can build on this it's now very cheap to build parallel libraries this is useful both for scientists doing one-off things it's also useful for library developers who want to paralyze our libraries where our job is literally to help the scientific Python community paralyze that's what we get paid for full-time so you all who work on things who think your library paralyzed me was a good need for that please come and talk to us ok students talk about the nicely for a bit I'm going to talk about rigor the scheduler afterwards so now that's thing about ask is that the separation allows us to switch out schedulers Sadowski was really designed to work on a single machine to sort of unlock the power of your laptop your MacBook Pro is way more powerful than a single core I know we've switched out to distribute distributing computing so we can run on you know thousand node clusters so okay enough to Jim he's gonna talk about das today well expand so as matt said we have a new ish api called death delayed it's really useful for creating arbitrary graphs you don't have to muck with the graphs themselves it's a very very simple interface it's a little bit like number with number jet we just have to ask dot delayed that's the only function you need so single function interface and it plays really well with existing code with a few caveats so function delayed if you pass it a function and then you call that function you get a lazy object that hasn't computed yet so it's task object and if you call delayed on data you also get a lazy object that pretends to be your data so a lot of the methods on at work operators work but they're all lazy and this will be a lot more clear with examples so there's gonna be running the distributed cluster on my local machine so I've just set this up on my macbook let's make that bigger wonderful so three functions here add mul and increment and we've added in a random sleep to make them slower because those are very simple functions you'd like them to take some time so if I call make a little bear if I call add on one and two X is the late object it's lazy and if I call compute on it I get the actual result so we can change this so you can call a couple forms in row in pen one Mullen one and two and then add on the results of both of those and looking at the graph you can see kind of get two operations joined together at at the end and finally calling compute it runs over here on the distributing scheduler and we have the result so delayed plays really nicely with loops you can loop over the computation so here we're gonna loop over and do the exact same operations before but in a loop and at the very end we're going to sum them all together here we're calling delayed in line so you don't have to use this decorator but you can I created above Sur again totals the late object looking at graph one really interesting thing here as you can see that this increment call which we've called four times in a loop it's all being used just once we do deterministic hashing on our arguments and so well that's what this pure equals true if you don't want to do the hashing you can say pure equals false but if it's true it'll share the intermediates and so it's able to determine we all need to run Inc ones because think of one is always the same result and so that's coming to handy finding nested shared expressions a deep in code that already exists and again we can run this get a little more complicated two loops looping over both parameters we get a nice bigger graph again this all looks very much like normal Python you probably have code that looks like this and run it so these are all fairly simple operations they're very much like the common map things you're familiar with something's a little more complicated to be doing a tree reduction so here we're gathering all of our results together and bring them back to one machine to sum them if those are big that can be a little bit expensive so something nice to do would be to gather them in small bits and reduce them so you're not having to send everything to one machine so that's where the tree reductions for so spark has this task uses this internally but it's also pretty easy to write up just using some normal Python code so we're gonna loop through grabbing pairs of things add them together and then append them new list I'm gonna keep doing that until the list is only one log again this is all normal Python doesn't look terribly complicated and you nice big graph but we're pulling everything together in pairs and that'll run so as I said before there are a couple caveats it works very well with most code you cannot use it in a loop so we can't loop over two late objects because we don't know how long they are until we've computed them and you can't use them as case statements cases and if statements because we don't know if they're true or false until we've computed them but everything else methods operators function calls should just work and now Matt's gonna talk about how the distribute CEO dinners so one of the nice things about that's delayed is that there's of all the algorithmic powers in your hands so for example this tree reduction it's something that you know you user might ask that the – data frame developers hey I want three reductions and we left us grump a little bit and we eventually add it this is just something I think probably 90% of this room can after a minute of looking at this code write it or understand it it's very accessible Python so – delay it really lets the algorithm developers think of algorithms and doesn't have to us enforce anything about sockets or resiliency or cluster computing or you know authentication or whatever so it's really nice separation between infrastructure and algorithms so – delayed lets you arbiter arbitrary graphs and now it's the job of the scheduler to execute them this is a trace of the the single machine threaded scheduler walking through a machine learning graph and it's tricky there's a lot of things need to think about here so for example if I have this variable computation F of you know call F twice take both of the results you get some on G I have two computers I may choose I've some perilous okay I may choose to run a from different machines and then need to move either X to Y or Y to X and I might look at the size of those data is determined you know which one I should move I should play move the smaller one there may be some complications maybe F is actually really fast and it's better to run them on one machine may be that there's special hardware involved maybe G requires a GPU to make sure that we run G on say the right machine a lot of interesting things that can come up here this is the general topic of path scheduling Dass have had a few different task schedulers there's a single machine scheduler which has been around for a while sort of when we first started – this is created the first one is literally 14 lines long it's grown up into a moderate 600 lines of code which is actually pretty accessible is pretty hackable and it's been stable throughout the last year there are people running on the desk scheduler right now I'm entirely confident that it's under heavy use and it has not changed that much about the last year it's pretty rock-solid co2 the point at this point it's attend mostly to minimize Ram to leverage your hard drive you can easily process hundred gigabyte datasets as long as you carefully walk through the graph and delete intermediate aboutabout variables as quickly as possible and the Dow scheduler has lots of heuristics to make sure that we walk through the graph in nice ways as low overhead and it's again it's hit by a lot of real-world use cases faster a nice data frame the x-ray community is pretty happy on this and you know continuum clients and consulting projects what's new is a tribute scheduler this is sort of working on this in around September last year so the super scheduler has three different components there is a central scheduler with those on one computer there's the client or the user which is us we are sort of sitting out of laptops in our Jupiter notebooks and then there's a lot of workers so the scheduler coordinates the workers to do this work and the user schedule the user submits little graphs so here the client authors some graph this my graph might be authored implicitly through da story or das data frame Hughes doesn't know they're making a graph there's what it's sort of hitting compute the client sends that graph up to the scheduler and the scheduler farms out that work to the workers here we have three tasks that are available on time and we see those tasks are allocated to the worker as those tasks finish so blue is when a task finishes we're able to schedule more work so the task scheduler is that is that a local aware so these two tasks finished and so it knew that this task was available those two tasks were on this worker so it scheduled that extra task to that worker it's keeping track of where the data is trying to minimize that in movement the client I'm sure that's easy to see there are three little blue circles down here the client is able to track what's happening on the scheduler real-time as the competition is happening so it's not submit a graph wait for an answer it's submitter graph and watch what's happening and based on what's happening that the client can actually submit other graphs to the scheduler so low thing is asynchronous there's a real time conversation happening between the scheduler and the client all the time and we can adapt our computation as it's happening and you know more what can happen as task two no longer needed its intermediate value no longer deleted the scheduler can delete those values keeping RAM available and free it's it's quite good at us because of the history with the single machine scheduler this is priority the scheduler is multi-user capable so we have multiple users all hammering on the same computer resources and everything is nicely shared and also protected everything because I think is hashed it's very rare to get conflicts and you know there's load balancing there's work stealing there's pretty much every optimization you can think of happening that we can do in constant time so the dynamic scheduler is is very fast once it around 200 microseconds have overhead per task it's around 5,000 tasks per second so if your task gets around one second each it can set it can saturate around 5,000 course that's roughly the limits over time we compute more and more complex graphs we do more data science and discovery keeps track of everything and interesting things happened here a schedule so does garbage collection so the client goes away or ties it doesn't want data anymore the the scheduler will will nicely remove the data around and keep everything clean so be running for a long time if a worker goes down well we can lose some data so here we've lost that on the left but it's okay because the scheduler keeps track of how to produce that data it has a plan to rebuild it and so I can keep building that competition up with the remaining workers because elastic it's resilient is that a local it's multi user aware it's asynchronous it works well on the traditional Hadoop file system ecosystem it always also works very well on the traditional high performance computing a traditional closed computing system s GE or LS f it's very very happy if you have yarn or an HDFS you're very happy it runs well in lots of these things we're supporting sort of both the traditional scientific commuting community and these sort of you know big data Hadoop sparking community everything here is pure Python so to notes it was really really convenient when we were building this to have – brand – data frame and – the laid around we had a huge set of algorithms on which to stress the scheduler from historic single machine use so we've made the scheduler smart enough to handle all these use cases but it's not optimized for arrays or optimized for data frames all the optimizations are sort of small scale and so it's very common now to be presented with a new completely new algorithm and I've everything just work because we had to make lots of little small local optimizations they're creating that making arrays not a frames faster so it's very common to say either this we have problem and like I just try it here six we'll all they work out not all the time but it's bigan well it is less concise it's around a 3,000 line of code traído application this is a slight problem because this community doesn't have a huge amount of tornado experience generally there are some counter examples a lot of jupiter's run in tornado a lot of bouquets right in tornado we've we've tried pretty hard to keep all of the all of the logic bits like which workers should this task run on all of that is sort of tornado free so it's very packable it's intentionally very hackable people in this room can make the scheduler better this is all completely accessible to you it's pure Python this is BSD licensed it's written you know in Python to Python 3 standard library code it even runs on pi PI it's sort of very very accessible ok these get started it's on Conda Forge it's on the default channels it's on pip you can run it with these two lines it'll create a local scheduler for you or if you want to set up your own scheduler on our own cluster on on cluster you just created a scheduler on one machine and that's workers on the other machines just pointing back to the scheduler it's it's it is easy to deploy ok Jim is now going to give you a last hurrah with machine learning stuff ok no I'm good something oh good good that's a lie so I'm just gonna quick run all these and talk about them as they run so this is I've been spending some time here working on setting up tasks work with scikit-learn and running wonderful cool so I've been making a library which lives right here under J Chris / – learn you might move over the desk org very quickly here it's just a proof of concept if you know things about machine learning please come and talk to me I would like to get your opinions this is gonna be running what this is doing is this is running on a local machine we have 248 course protocols for different workers and we're working with the airline data set so it's flight data from a 1987 we're actually looking only at the 2000s so I've loaded it all from CSVs I've done some read CSV operations here little pandas munging and then doing here's it a little delayed marker here this is custom work to a one-hot categorical encode some of the columns into categories so we can get a big sparse array and then down here this is from the desk learn library we are creating some matrices and then wrapping scikit-learn estimators to do fitting and then we're gonna say doing a grid search across multiple machines and then this is data parallel across multiple data frames so that's what's going on over here so we're fitting an SGD classifier many of them and then averaging them together and we're doing this across a grid of alpha and loss and three CSV their CV splits and so I'll let this run and at the very end here we should actually get the result this plays very well with the socket learning API that was what it was designed to match it's a couple of functions long and this was all written using Adelaide wherever possible just to show how flexible desk can be for non big collection thing so we're gonna let this run in the background while Matt includes here okay so one more minute so we saw – supplies in the pine pandas decide to build ad hoc sort of parallel algorithms using oscillate we held about those run intelligently on a cluster or on a single machine wanted us break out some acknowledgments here so there's a huge number of people who work on dossiers around 70 individuals that's really nice those individuals are both people that's meeting random bug reports those lot of people in this room we've have core library developers from numpy from pandas from x-ray from Jupiter from scikit-learn all very active in the conversations of the designers library it's really been a community effort there's no way we could have got this stuff on our own it's really nice that's why I think the team analytics so it's really nice to working up a source and also get paid for it oh that's it's a really sweet gig we've got a lot of funding historically from from Barbara they set a program and we're very excited to announce that we've been chosen that the Moore Foundation has generously decided to fund continued work on tasks and on numba we're very grateful for that both because it's a good sign of funding with really important problems for software is also a great show of trust the Moore Foundation was really good to this community historically and it's really there's a real honor to be part of that sort of part of our crowd so thank you all for the time for your time there are some desk stickers around here if you're interested I'll have some also and we'll also be around outside and also a continuum happy hour tonight at the offices so oh thank you that was the library sorry the question is what is the library I was using for the machine learning stuff that was the proof-of-concept machine learning library called desk learn it's up on github so the question is that we also used scikit-learn yes so task array is built using numpy we compose the items SIA tasks learn is built using scikit-learn it's a thin wrapper around scikit-learn operations we're just trying to leverage what already exists you should come talk to us and we'll we'll work that up the question is whether the new restrictions kind of functions that blade should accept weenus have any Python function it's up to you to write that Python function we generally assume that that function is pure and has no side effects that make sense the question is how easy is it to for me to a dynamically add and move workers the answer is very easy you can just make a new work and point to the scheduler anytime you can move a worker any time yes question is we have any idea any plans to support geo pandas the answer is that we are completely incapable of supporting any library without that library support so we needed Jeffrey by MasterCard crochet to do pandas we would love to integrate with geo pandas if those libraries willing to step up so please your part that library come talk to us that general invitation goes out to everyone so Japan is a really important library okay so we probably have time for one more question but if Bhargava come up for the next talk and can we get set up then back anyone yes question is so in the distributing scheduler under s asks what parts the parts are still centralized so the central scheduler is completely centered centralized the workers are actually quite dumb the scheduler it has all the logic that's entirely in one spot if that machine goes down you've lost all of your work so it's a most sort of many systems at this scale follow that model it's not a decentralized system yeah yeah great thank you we'll be outside and again if you mean happy hour please come talk to us we have very fun stickers they're very new I'm very excited you should come get them you

4 Comments

  1. Zapy said:

    The way you switch from one speaker to the other is like a rap concert 😉
    Nice talk

    June 30, 2019
    Reply
  2. vaila ruthvik said:

    OMG python is totally eating up everything in it's way!! I have been using a for loop to do grid search on neural networks using numpy for EMNIST classification and I also have access to 32 node cluster, all I need is dask and now I have it.

    June 30, 2019
    Reply
  3. Enthought said:

    See the complete SciPy 2016 Conference talk & tutorial playlist here: https://www.youtube.com/playlist?list=PLYx7XA2nY5Gf37zYZMw6OqGFRPjB1jCy6

    June 30, 2019
    Reply
  4. Matthew Rocklin said:

    Other Dask videos available here: https://www.youtube.com/playlist?list=PLRtz5iA93T4PQvWuoMnIyEIz1fXiJ5Pri
    Documentation: http://dask.readthedocs.org/en/latest/ http://distributed.readthedocs.org/en/latest/
    Github: http://github.com/dask/dask http://github.com/dask/distributed
    Jim's blog: http://jcrist.github.io/blog.html
    Matt's blog: http://matthewrocklin.com/blog

    June 30, 2019
    Reply

Leave a Reply

Your email address will not be published. Required fields are marked *